This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 560f4833 IGNITE-11584 Implement batch insertion of new cache entries in FreeList to improve rebalancing (#6364) 560f4833 is described below commit 560f48332f511edd0bd1efdc1e4d372491ef8779 Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Thu Aug 29 12:54:55 2019 +0300 IGNITE-11584 Implement batch insertion of new cache entries in FreeList to improve rebalancing (#6364) --- .../processors/cache/GridCacheEntryEx.java | 39 ++- .../processors/cache/GridCacheMapEntry.java | 35 ++- .../cache/IgniteCacheOffheapManager.java | 25 ++ .../cache/IgniteCacheOffheapManagerImpl.java | 61 +++++ .../dht/preloader/GridDhtPartitionDemander.java | 224 +++++++--------- .../cache/persistence/DataRowCacheAware.java | 68 +++++ .../cache/persistence/GridCacheOffheapManager.java | 16 ++ .../IgniteCacheDatabaseSharedManager.java | 31 +-- .../processors/cache/persistence/RowStore.java | 15 +- .../persistence/evict/NoOpPageEvictionTracker.java | 5 + .../evict/PageAbstractEvictionTracker.java | 15 ++ .../persistence/evict/PageEvictionTracker.java | 7 + .../persistence/freelist/AbstractFreeList.java | 283 +++++++++++++++++---- .../cache/persistence/freelist/FreeList.java | 14 +- .../processors/cache/GridCacheTestEntryEx.java | 3 +- .../processors/database/CacheFreeListSelfTest.java | 186 ++++++++++++-- 16 files changed, 785 insertions(+), 242 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 9aec399..71702ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -792,6 +792,42 @@ public interface GridCacheEntryEx { * @throws IgniteCheckedException In case of error. * @throws GridCacheEntryRemovedException If entry was removed. */ + default boolean initialValue(CacheObject val, + GridCacheVersion ver, + @Nullable MvccVersion mvccVer, + @Nullable MvccVersion newMvccVer, + byte mvccTxState, + byte newMvccTxState, + long ttl, + long expireTime, + boolean preload, + AffinityTopologyVersion topVer, + GridDrType drType, + boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException { + return initialValue(val, ver, null, null, TxState.NA, TxState.NA, + ttl, expireTime, preload, topVer, drType, fromStore, null); + } + + /** + * Sets new value if current version is <tt>0</tt> + * + * @param val New value. + * @param ver Version to use. + * @param mvccVer Mvcc version. + * @param newMvccVer New mvcc version. + * @param mvccTxState Tx state hint for mvcc version. + * @param newMvccTxState Tx state hint for new mvcc version. + * @param ttl Time to live. + * @param expireTime Expiration time. + * @param preload Flag indicating whether entry is being preloaded. + * @param topVer Topology version. + * @param drType DR type. + * @param fromStore {@code True} if value was loaded from store. + * @param row Pre-created data row, associated with this cache entry. + * @return {@code True} if initial value was set. + * @throws IgniteCheckedException In case of error. + * @throws GridCacheEntryRemovedException If entry was removed. + */ public boolean initialValue(CacheObject val, GridCacheVersion ver, @Nullable MvccVersion mvccVer, @@ -803,7 +839,8 @@ public interface GridCacheEntryEx { boolean preload, AffinityTopologyVersion topVer, GridDrType drType, - boolean fromStore) throws IgniteCheckedException, GridCacheEntryRemovedException; + boolean fromStore, + @Nullable CacheDataRow row) throws IgniteCheckedException, GridCacheEntryRemovedException; /** * Create versioned entry for this cache entry. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 194c97a..725c01a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -3310,7 +3310,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean preload, AffinityTopologyVersion topVer, GridDrType drType, - boolean fromStore + boolean fromStore, + CacheDataRow row ) throws IgniteCheckedException, GridCacheEntryRemovedException { ensureFreeSpace(); @@ -3386,7 +3387,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme cctx.offheap().mvccInitialValue(this, val, ver, expTime, mvccVer, newMvccVer); } else - storeValue(val, expTime, ver); + storeValue(val, expTime, ver, null, row); } } else { @@ -3417,7 +3418,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } else // Optimization to access storage only once. - update = storeValue(val, expTime, ver, p); + update = storeValue(val, expTime, ver, p, row); } if (update) { @@ -4257,7 +4258,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme protected boolean storeValue(@Nullable CacheObject val, long expireTime, GridCacheVersion ver) throws IgniteCheckedException { - return storeValue(val, expireTime, ver, null); + return storeValue(val, expireTime, ver, null, null); } /** @@ -4267,6 +4268,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param expireTime Expire time. * @param ver New entry version. * @param predicate Optional predicate. + * @param row Pre-created data row, associated with this cache entry. * @return {@code True} if storage was modified. * @throws IgniteCheckedException If update failed. */ @@ -4274,10 +4276,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable CacheObject val, long expireTime, GridCacheVersion ver, - @Nullable IgnitePredicate<CacheDataRow> predicate) throws IgniteCheckedException { + @Nullable IgnitePredicate<CacheDataRow> predicate, + @Nullable CacheDataRow row + ) throws IgniteCheckedException { assert lock.isHeldByCurrentThread(); - UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate); + UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate, row); cctx.offheap().invoke(cctx, key, localPartition(), closure); @@ -5730,12 +5734,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param predicate Optional predicate. */ UpdateClosure(GridCacheMapEntry entry, @Nullable CacheObject val, GridCacheVersion ver, long expireTime, - @Nullable IgnitePredicate<CacheDataRow> predicate) { + @Nullable IgnitePredicate<CacheDataRow> predicate, @Nullable CacheDataRow newRow) { this.entry = entry; this.val = val; this.ver = ver; this.expireTime = expireTime; this.predicate = predicate; + this.newRow = newRow; } /** {@inheritDoc} */ @@ -5755,13 +5760,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } if (val != null) { - newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow( - entry.cctx, - entry.key, - val, - ver, - expireTime, - oldRow); + if (newRow == null) { + newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow( + entry.cctx, + entry.key, + val, + ver, + expireTime, + oldRow); + } treeOp = oldRow != null && oldRow.link() == newRow.link() ? IgniteTree.OperationType.IN_PLACE : IgniteTree.OperationType.PUT; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index ab8d338..e73ad52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import javax.cache.Cache; @@ -29,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; +import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware; import org.apache.ignite.internal.processors.cache.persistence.RootPage; import org.apache.ignite.internal.processors.cache.persistence.RowStore; import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow; @@ -47,6 +50,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.internal.util.lang.IgnitePredicateX; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; @@ -504,6 +508,17 @@ public interface IgniteCacheOffheapManager { throws IgniteCheckedException; /** + * Store entries. + * + * @param partId Partition number. + * @param infos Entry infos. + * @param initPred Applied to all created rows. Each row that not matches the predicate is removed. + * @throws IgniteCheckedException If failed. + */ + public void storeEntries(int partId, Iterator<GridCacheEntryInfo> infos, + IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException; + + /** * Clears offheap entries. * * @param cctx Cache context. @@ -695,6 +710,16 @@ public interface IgniteCacheOffheapManager { @Nullable CacheDataRow oldRow) throws IgniteCheckedException; /** + * Insert rows into page memory. + * + * @param rows Rows. + * @param initPred Applied to all rows. Each row that not matches the predicate is removed. + * @throws IgniteCheckedException If failed. + */ + public void insertRows(Collection<DataRowCacheAware> rows, + IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException; + + /** * @param cctx Cache context. * @param cleanupRows Rows to cleanup. * @throws IgniteCheckedException If failed. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 57cca44..0d9b9fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; +import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware; import org.apache.ignite.internal.processors.cache.persistence.RootPage; import org.apache.ignite.internal.processors.cache.persistence.RowStore; import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow; @@ -110,6 +112,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.internal.util.lang.IgnitePredicateX; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -122,6 +125,7 @@ import org.jetbrains.annotations.Nullable; import static java.lang.Boolean.TRUE; import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.INITIAL_VERSION; import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA; @@ -145,6 +149,9 @@ import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT; * */ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager { + /** The maximum number of entries that can be preloaded under checkpoint read lock. */ + public static final int PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK = 100; + /** */ private final boolean failNodeOnPartitionInconsistency = Boolean.getBoolean( IgniteSystemProperties.IGNITE_FAIL_NODE_ON_UNRECOVERABLE_PARTITION_INCONSISTENCY); @@ -1207,6 +1214,37 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ + @Override public void storeEntries(int partId, Iterator<GridCacheEntryInfo> infos, + IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException { + CacheDataStore dataStore = dataStore(partId); + + List<DataRowCacheAware> batch = new ArrayList<>(PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK); + + while (infos.hasNext()) { + GridCacheEntryInfo info = infos.next(); + + assert info.ttl() == TTL_ETERNAL : info.ttl(); + + batch.add(new DataRowCacheAware(info.key(), + info.value(), + info.version(), + partId, + info.expireTime(), + info.cacheId(), + grp.storeCacheIdInDataPage())); + + if (batch.size() == PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK) { + dataStore.insertRows(batch, initPred); + + batch.clear(); + } + } + + if (!batch.isEmpty()) + dataStore.insertRows(batch, initPred); + } + + /** {@inheritDoc} */ @Override public final CacheDataStore createCacheDataStore(int p) throws IgniteCheckedException { CacheDataStore dataStore; @@ -1713,6 +1751,29 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return dataRow; } + /** {@inheritDoc} */ + @Override public void insertRows(Collection<DataRowCacheAware> rows, + IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException { + if (!busyLock.enterBusy()) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + try { + rowStore.addRows(F.view(rows, row -> row.value() != null), grp.statisticsHolderData()); + + boolean cacheIdAwareGrp = grp.sharedGroup() || grp.storeCacheIdInDataPage(); + + for (DataRowCacheAware row : rows) { + row.storeCacheId(cacheIdAwareGrp); + + if (!initPred.apply(row) && row.value() != null) + rowStore.removeRow(row.link(), grp.statisticsHolderData()); + } + } + finally { + busyLock.leaveBusy(); + } + } + /** * @param key Cache key. * @param val Cache value. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 9386193..1b471f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -56,15 +56,15 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware; -import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.IgniteInClosureX; +import org.apache.ignite.internal.util.lang.IgnitePredicateX; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -80,6 +80,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOAD import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED; +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL; +import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl.PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD; @@ -752,10 +754,16 @@ public class GridDhtPartitionDemander { try { Iterator<GridCacheEntryInfo> infos = e.getValue().infos().iterator(); - if (grp.mvccEnabled()) - mvccPreloadEntries(topVer, node, p, infos); - else - preloadEntries(topVer, node, p, infos); + try { + if (grp.mvccEnabled()) + mvccPreloadEntries(topVer, node, p, infos); + else + preloadEntries(topVer, p, infos); + } + catch (GridDhtInvalidPartitionException ignored) { + if (log.isDebugEnabled()) + log.debug("Partition became invalid during rebalancing (will ignore): " + p); + } fut.processed.get(p).increment(); @@ -890,7 +898,7 @@ public class GridDhtPartitionDemander { ctx.database().checkpointReadLock(); try { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK; i++) { boolean hasMore = infos.hasNext(); assert hasMore || !entryHist.isEmpty(); @@ -922,20 +930,9 @@ public class GridDhtPartitionDemander { } if (cctx != null) { - if (!mvccPreloadEntry(cctx, node, entryHist, topVer, p)) { - if (log.isTraceEnabled()) - log.trace("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + - ", entry=" + entryHist.get(entryHist.size() - 1) + ']'); + mvccPreloadEntry(cctx, node, entryHist, topVer, p); - return; // Skip current partition. - } - - //TODO: IGNITE-11330: Update metrics for touched cache only. - for (GridCacheContext ctx : grp.caches()) { - if (ctx.statisticsEnabled()) - ctx.cache().metrics0().onRebalanceKeyReceived(); - } + updateGroupMetrics(); } if (!hasMore) @@ -954,140 +951,100 @@ public class GridDhtPartitionDemander { } /** - * Adds entries with theirs history to partition p. + * Adds entries to partition p. * - * @param node Node which sent entry. + * @param topVer Topology version. * @param p Partition id. * @param infos Entries info for preload. - * @param topVer Topology version. - * @throws IgniteInterruptedCheckedException If interrupted. + * @throws IgniteCheckedException If failed. */ - private void preloadEntries(AffinityTopologyVersion topVer, ClusterNode node, int p, + private void preloadEntries(AffinityTopologyVersion topVer, int p, Iterator<GridCacheEntryInfo> infos) throws IgniteCheckedException { - GridCacheContext cctx = null; - - // Loop through all received entries and try to preload them. - while (infos.hasNext()) { - ctx.database().checkpointReadLock(); - - try { - for (int i = 0; i < 100; i++) { - if (!infos.hasNext()) - break; - - GridCacheEntryInfo entry = infos.next(); - - if (cctx == null || (grp.sharedGroup() && entry.cacheId() != cctx.cacheId())) { - cctx = grp.sharedGroup() ? grp.shared().cacheContext(entry.cacheId()) : grp.singleCacheContext(); - - if (cctx == null) - continue; - else if (cctx.isNear()) - cctx = cctx.dhtCache().context(); - } - if (!preloadEntry(node, p, entry, topVer, cctx)) { - if (log.isTraceEnabled()) - log.trace("Got entries for invalid partition during " + - "preloading (will skip) [p=" + p + ", entry=" + entry + ']'); - - return; - } - - //TODO: IGNITE-11330: Update metrics for touched cache only. - for (GridCacheContext ctx : grp.caches()) { - if (ctx.statisticsEnabled()) - ctx.cache().metrics0().onRebalanceKeyReceived(); - } - } - } - finally { - ctx.database().checkpointReadUnlock(); + grp.offheap().storeEntries(p, infos, new IgnitePredicateX<CacheDataRow>() { + @Override public boolean applyx(CacheDataRow row) throws IgniteCheckedException { + return preloadEntry(row, topVer); } - } + }); } /** * Adds {@code entry} to partition {@code p}. * - * @param from Node which sent entry. - * @param p Partition id. - * @param entry Preloaded entry. + * @param row Data row. * @param topVer Topology version. - * @param cctx Cache context. - * @return {@code False} if partition has become invalid during preloading. + * @return {@code True} if the initial value was set for the specified cache entry. * @throws IgniteInterruptedCheckedException If interrupted. */ - private boolean preloadEntry( - ClusterNode from, - int p, - GridCacheEntryInfo entry, - AffinityTopologyVersion topVer, - GridCacheContext cctx - ) throws IgniteCheckedException { + private boolean preloadEntry(CacheDataRow row, AffinityTopologyVersion topVer) throws IgniteCheckedException { + assert !grp.mvccEnabled(); assert ctx.database().checkpointLockIsHeldByThread(); - try { - GridCacheEntryEx cached = null; + updateGroupMetrics(); - try { - cached = cctx.cache().entryEx(entry.key(), topVer); + GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(row.cacheId()) : grp.singleCacheContext(); - if (log.isTraceEnabled()) { - log.trace("Rebalancing key [key=" + entry.key() + ", part=" + p + ", fromNode=" + - from.id() + ", grpId=" + grp.groupId() + ']'); - } + if (cctx == null) + return false; - if (cached.initialValue( - entry.value(), - entry.version(), - cctx.mvccEnabled() ? ((MvccVersionAware)entry).mvccVersion() : null, - cctx.mvccEnabled() ? ((MvccUpdateVersionAware)entry).newMvccVersion() : null, - cctx.mvccEnabled() ? ((MvccVersionAware)entry).mvccTxState() : TxState.NA, - cctx.mvccEnabled() ? ((MvccUpdateVersionAware)entry).newMvccTxState() : TxState.NA, - entry.ttl(), - entry.expireTime(), - true, - topVer, - cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE, - false - )) { - cached.touch(); // Start tracking. + cctx = cctx.isNear() ? cctx.dhtCache().context() : cctx; - if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) - cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null, - null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null, - false, null, null, null, true); - } - else { - cached.touch(); // Start tracking. + GridCacheEntryEx cached = cctx.cache().entryEx(row.key(), topVer); - if (log.isTraceEnabled()) - log.trace("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + - ", part=" + p + ']'); - } + try { + if (log.isTraceEnabled()) { + log.trace("Rebalancing key [key=" + cached.key() + ", part=" + cached.partition() + + ", grpId=" + grp.groupId() + ']'); } - catch (GridCacheEntryRemovedException ignored) { - if (log.isTraceEnabled()) - log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" + - cached.key() + ", part=" + p + ']'); + + assert row.expireTime() >= 0 : row.expireTime(); + + if (cached.initialValue( + row.value(), + row.version(), + null, + null, + TxState.NA, + TxState.NA, + TTL_ETERNAL, + row.expireTime(), + true, + topVer, + cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE, + false, + row + )) { + cached.touch(); // Start tracking. + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal()) + cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null, + null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, row.value(), true, null, + false, null, null, null, true); + + return true; } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition became invalid during rebalancing (will ignore): " + p); + else { + cached.touch(); // Start tracking. - return false; + if (log.isTraceEnabled()) + log.trace("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() + + ", part=" + cached.partition() + ']'); } } + catch (GridCacheEntryRemovedException ignored) { + if (log.isTraceEnabled()) + log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" + + cached.key() + ", part=" + cached.partition() + ']'); + } catch (IgniteInterruptedCheckedException e) { throw e; } catch (IgniteCheckedException e) { - throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" + - ctx.localNode() + ", node=" + from.id() + ", key=" + entry.key() + ", part=" + p + ']', e); + throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [" + + "key=" + row.key() + ", part=" + row.partition() + ']', e); } - return true; + return false; } /** @@ -1098,7 +1055,7 @@ public class GridDhtPartitionDemander { * @param history Mvcc entry history. * @param topVer Topology version. * @param p Partition id. - * @return {@code False} if partition has become invalid during preloading. + * @return {@code True} if the initial value was set for the specified cache entry. * @throws IgniteInterruptedCheckedException If interrupted. */ private boolean mvccPreloadEntry( @@ -1131,6 +1088,8 @@ public class GridDhtPartitionDemander { cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(), null, null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, null, true, null, false, null, null, null, true); + + return true; } else { cached.touch(); // Start tracking. @@ -1145,12 +1104,6 @@ public class GridDhtPartitionDemander { log.trace("Entry has been concurrently removed while rebalancing (will ignore) [key=" + cached.key() + ", part=" + p + ']'); } - catch (GridDhtInvalidPartitionException ignored) { - if (log.isDebugEnabled()) - log.debug("Partition became invalid during rebalancing (will ignore): " + p); - - return false; - } } catch (IgniteInterruptedCheckedException | ClusterTopologyCheckedException e) { throw e; @@ -1160,7 +1113,7 @@ public class GridDhtPartitionDemander { ctx.localNode() + ", node=" + from.id() + ", key=" + info.key() + ", part=" + p + ']', e); } - return true; + return false; } /** @@ -1173,6 +1126,19 @@ public class GridDhtPartitionDemander { return "grp=" + grp.cacheOrGroupName() + ", topVer=" + supplyMsg.topologyVersion() + ", supplier=" + supplier; } + /** + * Update rebalancing metrics. + */ + private void updateGroupMetrics() { + // TODO: IGNITE-11330: Update metrics for touched cache only. + // Due to historical rebalancing "EstimatedRebalancingKeys" metric is currently calculated for the whole cache + // group (by partition counters), so "RebalancedKeys" and "RebalancingKeysRate" is calculated in the same way. + for (GridCacheContext cctx0 : grp.caches()) { + if (cctx0.statisticsEnabled()) + cctx0.cache().metrics0().onRebalanceKeyReceived(); + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtPartitionDemander.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataRowCacheAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataRowCacheAware.java new file mode 100644 index 0000000..baee401 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataRowCacheAware.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.tree.DataRow; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.jetbrains.annotations.Nullable; + +/** + * Data row implementation that can optionally hide the cache identifier and can set {@code null} as value.<br> It is + * used to simplify storing a data row into page memory, because in some cases the cache identifier is not stored on the + * data pages, but is required to link this data row in {@code BPlusTree}. + */ +public class DataRowCacheAware extends DataRow { + /** Flag indicates that cacheId should be stored in data page. */ + private boolean storeCacheId; + + /** + * @param key Key. + * @param val Value. + * @param ver Version. + * @param part Partition. + * @param expireTime Expire time. + * @param cacheId Cache ID. + * @param storeCacheId Flag indicates that cacheId should be stored in data page. + */ + public DataRowCacheAware(KeyCacheObject key, @Nullable CacheObject val, GridCacheVersion ver, int part, + long expireTime, int cacheId, boolean storeCacheId) { + super(key, val, ver, part, expireTime, cacheId); + + storeCacheId(storeCacheId); + } + + /** + * @param storeCacheId Flag indicates that cacheId should be stored in data page. + */ + public void storeCacheId(boolean storeCacheId) { + this.storeCacheId = storeCacheId; + } + + /** {@inheritDoc} */ + @Override public int cacheId() { + return storeCacheId ? cacheId : CU.UNDEFINED_CACHE_ID; + } + + /** {@inheritDoc} */ + @Override public @Nullable CacheObject value() { + return val; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 0b027f1..bbddc1c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -101,6 +102,7 @@ import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.internal.util.lang.IgnitePredicateX; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -2383,6 +2385,20 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ + @Override public void insertRows(Collection<DataRowCacheAware> rows, + IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException { + CacheDataStore delegate = init0(false); + + ctx.database().checkpointReadLock(); + + try { + delegate.insertRows(rows, initPred); + } finally { + ctx.database().checkpointReadUnlock(); + } + } + + /** {@inheritDoc} */ @Override public int cleanup(GridCacheContext cctx, @Nullable List<MvccLinkAwareSearchRow> cleanupRows) throws IgniteCheckedException { CacheDataStore delegate = init0(false); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index fb40901..aad30fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -994,35 +994,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap if (memPlc == null) return; - DataRegionConfiguration plcCfg = memPlc.config(); + while (memPlc.evictionTracker().evictionRequired()) { + warnFirstEvict(memPlc.config()); - if (plcCfg.getPageEvictionMode() == DataPageEvictionMode.DISABLED || plcCfg.isPersistenceEnabled()) - return; - - long memorySize = plcCfg.getMaxSize(); - - PageMemory pageMem = memPlc.pageMemory(); - - int sysPageSize = pageMem.systemPageSize(); - - CacheFreeList freeList = freeListMap.get(plcCfg.getName()); - - for (;;) { - long allocatedPagesCnt = pageMem.loadedPages(); - - int emptyDataPagesCnt = freeList.emptyDataPages(); - - boolean shouldEvict = allocatedPagesCnt > (memorySize / sysPageSize * plcCfg.getEvictionThreshold()) && - emptyDataPagesCnt < plcCfg.getEmptyPagesPoolSize(); - - if (shouldEvict) { - warnFirstEvict(plcCfg); - - memPlc.evictionTracker().evictDataPage(); + memPlc.evictionTracker().evictDataPage(); - memPlc.memoryMetrics().updateEvictionRate(); - } else - break; + memPlc.memoryMetrics().updateEvictionRate(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java index bdd1c2d..8601d98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.processors.cache.persistence; +import java.util.Collection; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.metric.IoStatisticsHolder; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.CacheGroupContext; @@ -26,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList; import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; -import org.apache.ignite.internal.metric.IoStatisticsHolder; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -118,6 +119,18 @@ public class RowStore { } /** + * @param rows Rows. + * @param statHolder Statistics holder to track IO operations. + * @throws IgniteCheckedException If failed. + */ + public void addRows(Collection<? extends CacheDataRow> rows, + IoStatisticsHolder statHolder) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + + freeList.insertDataRows(rows, statHolder); + } + + /** * @param link Row link. * @param row New row data. * @return {@code True} if was able to update row. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java index b420ecd..1d0b171 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/NoOpPageEvictionTracker.java @@ -47,4 +47,9 @@ public class NoOpPageEvictionTracker implements PageEvictionTracker { @Override public void forgetPage(long pageId) throws IgniteCheckedException { // No-op. } + + /** {@inheritDoc} */ + @Override public boolean evictionRequired() { + return false; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java index 41731b1..703fc6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageAbstractEvictionTracker.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; +import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; @@ -52,6 +53,9 @@ public abstract class PageAbstractEvictionTracker implements PageEvictionTracker /** Shared context. */ private final GridCacheSharedContext sharedCtx; + /** Data region configuration. */ + private final DataRegionConfiguration regCfg; + /** * @param pageMem Page memory. * @param plcCfg Data region configuration. @@ -66,12 +70,23 @@ public abstract class PageAbstractEvictionTracker implements PageEvictionTracker this.sharedCtx = sharedCtx; + regCfg = plcCfg; + trackingSize = pageMem.totalPages(); baseCompactTs = (U.currentTimeMillis() - DAY) >> COMPACT_TS_SHIFT; // We subtract day to avoid fail in case of daylight shift or timezone change. } + /** {@inheritDoc} */ + @Override public boolean evictionRequired() { + AbstractFreeList freeList = (AbstractFreeList)sharedCtx.database().freeList(regCfg.getName()); + + double pagesThreshold = regCfg.getEvictionThreshold() * regCfg.getMaxSize() / pageMem.systemPageSize(); + + return pageMem.loadedPages() > pagesThreshold && freeList.emptyDataPages() < regCfg.getEmptyPagesPoolSize(); + } + /** * @param pageIdx Page index. * @return true if at least one data row has been evicted diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java index baa5462..fa17ea4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/evict/PageEvictionTracker.java @@ -33,6 +33,13 @@ public interface PageEvictionTracker extends LifecycleAware { public void touchPage(long pageId) throws IgniteCheckedException; /** + * Check if page eviction is required according to the configured policy. + * + * @return {@code True} if eviction required. + */ + public boolean evictionRequired(); + + /** * Evicts one data page. * In most cases, all entries will be removed from the page. * Method guarantees removing at least one entry from "evicted" data page. Removing all entries may be diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java index b677116..cb32219 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java @@ -17,9 +17,12 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist; +import java.util.Collection; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.metric.IoStatisticsHolder; +import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp; import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageUtils; @@ -40,8 +43,8 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseB import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageLockListener; -import org.apache.ignite.internal.metric.IoStatisticsHolder; -import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp; +import org.apache.ignite.internal.util.GridCursorIteratorWrapper; +import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -130,13 +133,15 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp } } - /** */ - private final PageHandler<T, Integer> writeRow = new WriteRowHandler(); + /** Write a single row on a single page. */ + private final WriteRowHandler writeRowHnd = new WriteRowHandler(); - /** - * - */ - private final class WriteRowHandler extends PageHandler<T, Integer> { + /** Write multiple rows on a single page. */ + private final WriteRowsHandler writeRowsHnd = new WriteRowsHandler(); + + /** */ + private class WriteRowHandler extends PageHandler<T, Integer> { + /** {@inheritDoc} */ @Override public Integer run( int cacheId, long pageId, @@ -148,6 +153,31 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp int written, IoStatisticsHolder statHolder) throws IgniteCheckedException { + written = addRow(pageId, page, pageAddr, iox, row, written); + + putPage(((AbstractDataPageIO)iox).getFreeSpace(pageAddr), pageId, page, pageAddr, statHolder); + + return written; + } + + /** + * @param pageId Page ID. + * @param page Page absolute pointer. + * @param pageAddr Page address. + * @param iox IO. + * @param row Row to write. + * @param written Written size. + * @return Number of bytes written, {@link #COMPLETE} if the row was fully written. + * @throws IgniteCheckedException If failed. + */ + protected Integer addRow( + long pageId, + long page, + long pageAddr, + PageIO iox, + T row, + int written) + throws IgniteCheckedException { AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox; int rowSize = row.size(); @@ -156,18 +186,9 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp assert oldFreeSpace > 0 : oldFreeSpace; // If the full row does not fit into this page write only a fragment. - written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize) : + written = (written == 0 && oldFreeSpace >= rowSize) ? addRowFull(pageId, page, pageAddr, io, row, rowSize) : addRowFragment(pageId, page, pageAddr, io, row, written, rowSize); - // Reread free space after update. - int newFreeSpace = io.getFreeSpace(pageAddr); - - if (newFreeSpace > MIN_PAGE_FREE_SPACE) { - int bucket = bucket(newFreeSpace, false); - - put(null, pageId, page, pageAddr, bucket, statHolder); - } - if (written == rowSize) evictionTracker.touchPage(pageId); @@ -185,7 +206,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp * @return Written size which is always equal to row size here. * @throws IgniteCheckedException If failed. */ - private int addRow( + protected int addRowFull( long pageId, long page, long pageAddr, @@ -225,7 +246,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp * @return Updated written size. * @throws IgniteCheckedException If failed. */ - private int addRowFragment( + protected int addRowFragment( long pageId, long page, long pageAddr, @@ -254,6 +275,66 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp return written + payloadSize; } + + /** + * Put page into the free list if needed. + * + * @param freeSpace Page free space. + * @param pageId Page ID. + * @param page Page pointer. + * @param pageAddr Page address. + * @param statHolder Statistics holder to track IO operations. + */ + protected void putPage(int freeSpace, long pageId, long page, long pageAddr, IoStatisticsHolder statHolder) + throws IgniteCheckedException { + if (freeSpace > MIN_PAGE_FREE_SPACE) { + int bucket = bucket(freeSpace, false); + + put(null, pageId, page, pageAddr, bucket, statHolder); + } + } + } + + /** */ + private final class WriteRowsHandler extends PageHandler<GridCursor<T>, Integer> { + /** {@inheritDoc} */ + @Override public Integer run( + int cacheId, + long pageId, + long page, + long pageAddr, + PageIO iox, + Boolean walPlc, + GridCursor<T> cur, + int written, + IoStatisticsHolder statHolder) + throws IgniteCheckedException { + AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox; + + // Fill the page up to the end. + while (written != COMPLETE || (!evictionTracker.evictionRequired() && cur.next())) { + T row = cur.get(); + + if (written == COMPLETE) { + // If the data row was completely written without remainder, proceed to the next. + if ((written = writeWholePages(row, statHolder)) == COMPLETE) + continue; + + if (io.getFreeSpace(pageAddr) < row.size() - written) + break; + } + + written = writeRowHnd.addRow(pageId, page, pageAddr, io, row, written); + + assert written == COMPLETE; + + evictionTracker.touchPage(pageId); + } + + writeRowHnd.putPage(io.getFreeSpace(pageAddr), pageId, page, pageAddr, statHolder); + + return written; + } } /** */ @@ -473,8 +554,6 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp /** {@inheritDoc} */ @Override public void insertDataRow(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException { - int rowSize = row.size(); - int written = 0; try { @@ -482,62 +561,168 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp if (written != 0) memMetrics.incrementLargeEntriesPages(); - int remaining = rowSize - written; + written = writeSinglePage(row, written, statHolder); + } + while (written != COMPLETE); + } + catch (IgniteCheckedException | Error e) { + throw e; + } + catch (Throwable t) { + throw new CorruptedFreeListException("Failed to insert data row", t); + } + } - long pageId = 0L; + /** + * Reduces the workload on the free list by writing multiple rows into a single memory page at once.<br> + * <br> + * Rows are sequentially added to the page as long as there is enough free space on it. If the row is large then + * those fragments that occupy the whole memory page are written to other pages, and the remainder is added to the + * current one. + * + * @param rows Rows. + * @param statHolder Statistics holder to track IO operations. + * @throws IgniteCheckedException If failed. + */ + @Override public void insertDataRows(Collection<T> rows, + IoStatisticsHolder statHolder) throws IgniteCheckedException { + try { + GridCursor<T> cur = new GridCursorIteratorWrapper<>(rows.iterator()); - if (remaining < MIN_SIZE_FOR_DATA_PAGE) { - for (int b = bucket(remaining, false) + 1; b < BUCKETS - 1; b++) { - pageId = takeEmptyPage(b, row.ioVersions(), statHolder); + int written = COMPLETE; - if (pageId != 0L) - break; - } + while (written != COMPLETE || cur.next()) { + T row = cur.get(); + + // If eviction is required - free up memory before locking the next page. + while (evictionTracker.evictionRequired()) { + evictionTracker.evictDataPage(); + + memMetrics.updateEvictionRate(); } - if (pageId == 0L) { // Handle reuse bucket. - if (reuseList == this) - pageId = takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder); - else - pageId = reuseList.takeRecycledPage(); + if (written == COMPLETE) { + written = writeWholePages(row, statHolder); + + continue; } AbstractDataPageIO initIo = null; + long pageId = takePage(row.size() - written, row, statHolder); + if (pageId == 0L) { pageId = allocateDataPage(row.partition()); initIo = row.ioVersions().latest(); } - else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket. - pageId = initReusedPage(row, pageId, row.partition(), statHolder); - else // Page is taken from free space bucket. For in-memory mode partition must be changed. - pageId = PageIdUtils.changePartitionId(pageId, (row.partition())); - written = write(pageId, writeRow, initIo, row, written, FAIL_I, statHolder); + written = write(pageId, writeRowsHnd, initIo, cur, written, FAIL_I, statHolder); assert written != FAIL_I; // We can't fail here. } - while (written != COMPLETE); } - catch (IgniteCheckedException | Error e) { - throw e; + catch (RuntimeException e) { + throw new CorruptedFreeListException("Failed to insert data rows", e); } - catch (Throwable t) { - throw new CorruptedFreeListException("Failed to insert data row", t); + } + + /** + * Write fragments of the row, which occupy the whole memory page. A data row is ignored if it is less than the max + * payload of an empty data page. + * + * @param row Row to process. + * @param statHolder Statistics holder to track IO operations. + * @return Number of bytes written, {@link #COMPLETE} if the row was fully written, {@code 0} if data row was + * ignored because it is less than the max payload of an empty data page. + * @throws IgniteCheckedException If failed. + */ + private int writeWholePages(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException { + assert row.link() == 0 : row.link(); + + int written = 0; + int rowSize = row.size(); + + while (rowSize - written >= MIN_SIZE_FOR_DATA_PAGE) { + written = writeSinglePage(row, written, statHolder); + + memMetrics.incrementLargeEntriesPages(); } + + return written; } /** + * Take a page and write row on it. + * + * @param row Row to write. + * @param written Written size. + * @param statHolder Statistics holder to track IO operations. + * @return Number of bytes written, {@link #COMPLETE} if the row was fully written. + * @throws IgniteCheckedException If failed. + */ + private int writeSinglePage(T row, int written, IoStatisticsHolder statHolder) throws IgniteCheckedException { + AbstractDataPageIO initIo = null; + + long pageId = takePage(row.size() - written, row, statHolder); + + if (pageId == 0L) { + pageId = allocateDataPage(row.partition()); + + initIo = row.ioVersions().latest(); + } + + written = write(pageId, writeRowHnd, initIo, row, written, FAIL_I, statHolder); + + assert written != FAIL_I; // We can't fail here. + + return written; + } + + /** + * Take page from free list. + * + * @param size Required free space on page. + * @param row Row to write. + * @param statHolder Statistics holder to track IO operations. + * @return Page identifier or 0 if no page found in free list. + * @throws IgniteCheckedException If failed. + */ + private long takePage(int size, T row, IoStatisticsHolder statHolder) throws IgniteCheckedException { + long pageId = 0; + + if (size < MIN_SIZE_FOR_DATA_PAGE) { + for (int b = bucket(size, false) + 1; b < REUSE_BUCKET; b++) { + pageId = takeEmptyPage(b, row.ioVersions(), statHolder); + + if (pageId != 0L) + break; + } + } + + if (pageId == 0L) { // Handle reuse bucket. + pageId = reuseList == this ? + takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder) : reuseList.takeRecycledPage(); + } + + if (pageId == 0L) + return 0; + + if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA) // Page is taken from reuse bucket. + return initReusedPage(row, pageId, statHolder); + else // Page is taken from free space bucket. For in-memory mode partition must be changed. + return PageIdUtils.changePartitionId(pageId, row.partition()); + } + + /** + * @param row Row. * @param reusedPageId Reused page id. - * @param partId Partition id. * @param statHolder Statistics holder to track IO operations. * @return Prepared page id. * * @see PagesList#initReusedPage(long, long, long, int, byte, PageIO) */ - private long initReusedPage(T row, long reusedPageId, int partId, - IoStatisticsHolder statHolder) throws IgniteCheckedException { + private long initReusedPage(T row, long reusedPageId, IoStatisticsHolder statHolder) throws IgniteCheckedException { long reusedPage = acquirePage(reusedPageId, statHolder); try { long reusedPageAddr = writeLock(reusedPageId, reusedPage); @@ -546,7 +731,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp try { return initReusedPage(reusedPageId, reusedPage, reusedPageAddr, - partId, PageIdAllocator.FLAG_DATA, row.ioVersions().latest()); + row.partition(), PageIdAllocator.FLAG_DATA, row.ioVersions().latest()); } finally { writeUnlock(reusedPageId, reusedPage, reusedPageAddr, true); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java index 28f5a50..5914ae52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java @@ -17,24 +17,34 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist; +import java.util.Collection; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.metric.IoStatisticsHolder; import org.apache.ignite.internal.processors.cache.persistence.Storable; import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; -import org.apache.ignite.internal.metric.IoStatisticsHolder; /** */ public interface FreeList<T extends Storable> { /** * @param row Row. + * @param statHolder Statistics holder to track IO operations. * @throws IgniteCheckedException If failed. */ public void insertDataRow(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException; /** + * @param rows Rows. + * @param statHolder Statistics holder to track IO operations. + * @throws IgniteCheckedException If failed. + */ + public void insertDataRows(Collection<T> rows, IoStatisticsHolder statHolder) throws IgniteCheckedException; + + /** * @param link Row link. * @param row New row data. + * @param statHolder Statistics holder to track IO operations. * @return {@code True} if was able to update row. * @throws IgniteCheckedException If failed. */ @@ -46,6 +56,7 @@ public interface FreeList<T extends Storable> { * @param arg Handler argument. * @param <S> Argument type. * @param <R> Result type. + * @param statHolder Statistics holder to track IO operations. * @return Result. * @throws IgniteCheckedException If failed. */ @@ -54,6 +65,7 @@ public interface FreeList<T extends Storable> { /** * @param link Row link. + * @param statHolder Statistics holder to track IO operations. * @throws IgniteCheckedException If failed. */ public void removeDataRowByLink(long link, IoStatisticsHolder statHolder) throws IgniteCheckedException; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 8d1ab87..f6900b8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -711,7 +711,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr boolean preload, AffinityTopologyVersion topVer, GridDrType drType, - boolean fromStore + boolean fromStore, + CacheDataRow row ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert false; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java index 6364590..eae81c7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.processors.database; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; @@ -74,6 +76,9 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { private static final long MB = 1024L * 1024L; /** */ + private static final int BATCH_SIZE = 100; + + /** */ private PageMemory pageMem; /** {@inheritDoc} */ @@ -90,6 +95,46 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { * @throws Exception if failed. */ @Test + public void testInsertDeleteSingleThreaded_batched_1024() throws Exception { + checkInsertDeleteSingleThreaded(1024, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_2048() throws Exception { + checkInsertDeleteSingleThreaded(2048, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_4096() throws Exception { + checkInsertDeleteSingleThreaded(4096, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_8192() throws Exception { + checkInsertDeleteSingleThreaded(8192, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteSingleThreaded_batched_16384() throws Exception { + checkInsertDeleteSingleThreaded(16384, true); + } + + /** + * @throws Exception if failed. + */ + @Test public void testInsertDeleteSingleThreaded_1024() throws Exception { checkInsertDeleteSingleThreaded(1024); } @@ -167,15 +212,64 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_1024() throws Exception { + checkInsertDeleteMultiThreaded(1024, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_2048() throws Exception { + checkInsertDeleteMultiThreaded(2048, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_4096() throws Exception { + checkInsertDeleteMultiThreaded(4096, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_8192() throws Exception { + checkInsertDeleteMultiThreaded(8192, true); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testInsertDeleteMultiThreaded_batched_16384() throws Exception { + checkInsertDeleteMultiThreaded(16384, true); + } + + /** + * @param pageSize Page size. + * @throws Exception if failed. + */ + protected void checkInsertDeleteMultiThreaded(int pageSize) throws Exception { + checkInsertDeleteMultiThreaded(pageSize, false); + } + + /** * @param pageSize Page size. + * @param batched Batch mode flag. * @throws Exception If failed. */ - protected void checkInsertDeleteMultiThreaded(final int pageSize) throws Exception { - final FreeList list = createFreeList(pageSize); + protected void checkInsertDeleteMultiThreaded(final int pageSize, final boolean batched) throws Exception { + final FreeList<CacheDataRow> list = createFreeList(pageSize); Random rnd = new Random(); - final ConcurrentMap<Long, TestDataRow> stored = new ConcurrentHashMap<>(); + final ConcurrentMap<Long, CacheDataRow> stored = new ConcurrentHashMap<>(); for (int i = 0; i < 100; i++) { int keySize = rnd.nextInt(pageSize * 3 / 2) + 10; @@ -187,7 +281,7 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { assertTrue(row.link() != 0L); - TestDataRow old = stored.put(row.link(), row); + CacheDataRow old = stored.put(row.link(), row); assertNull(old); } @@ -196,6 +290,8 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { GridTestUtils.runMultiThreaded(new Callable<Object>() { @Override public Object call() throws Exception { + List<CacheDataRow> rows = new ArrayList<>(BATCH_SIZE); + Random rnd = ThreadLocalRandom.current(); for (int i = 0; i < 200_000; i++) { @@ -226,25 +322,45 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { TestDataRow row = new TestDataRow(keySize, valSize); + if (batched) { + rows.add(row); + + if (rows.size() == BATCH_SIZE) { + list.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE); + + for (CacheDataRow row0 : rows) { + assertTrue(row0.link() != 0L); + + CacheDataRow old = stored.put(row0.link(), row0); + + assertNull(old); + } + + rows.clear(); + } + + continue; + } + list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); assertTrue(row.link() != 0L); - TestDataRow old = stored.put(row.link(), row); + CacheDataRow old = stored.put(row.link(), row); assertNull(old); } else { - while (true) { - Iterator<TestDataRow> it = stored.values().iterator(); + while (!stored.isEmpty()) { + Iterator<CacheDataRow> it = stored.values().iterator(); if (it.hasNext()) { - TestDataRow row = it.next(); + CacheDataRow row = it.next(); - TestDataRow rmvd = stored.remove(row.link); + CacheDataRow rmvd = stored.remove(row.link()); if (rmvd != null) { - list.removeDataRowByLink(row.link, IoStatisticsHolderNoOp.INSTANCE); + list.removeDataRowByLink(row.link(), IoStatisticsHolderNoOp.INSTANCE); break; } @@ -259,14 +375,24 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { } /** + * @param pageSize Page size. * @throws Exception if failed. */ protected void checkInsertDeleteSingleThreaded(int pageSize) throws Exception { - FreeList list = createFreeList(pageSize); + checkInsertDeleteSingleThreaded(pageSize, false); + } + + /** + * @param pageSize Page size. + * @param batched Batch mode flag. + * @throws Exception if failed. + */ + protected void checkInsertDeleteSingleThreaded(int pageSize, boolean batched) throws Exception { + FreeList<CacheDataRow> list = createFreeList(pageSize); Random rnd = new Random(); - Map<Long, TestDataRow> stored = new HashMap<>(); + Map<Long, CacheDataRow> stored = new HashMap<>(); for (int i = 0; i < 100; i++) { int keySize = rnd.nextInt(pageSize * 3 / 2) + 10; @@ -278,13 +404,15 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { assertTrue(row.link() != 0L); - TestDataRow old = stored.put(row.link(), row); + CacheDataRow old = stored.put(row.link(), row); assertNull(old); } boolean grow = true; + List<CacheDataRow> rows = new ArrayList<>(BATCH_SIZE); + for (int i = 0; i < 1_000_000; i++) { if (grow) { if (stored.size() > 20_000) { @@ -309,25 +437,45 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { TestDataRow row = new TestDataRow(keySize, valSize); + if (batched) { + rows.add(row); + + if (rows.size() == BATCH_SIZE) { + list.insertDataRows(rows, IoStatisticsHolderNoOp.INSTANCE); + + for (CacheDataRow row0 : rows) { + assertTrue(row0.link() != 0L); + + CacheDataRow old = stored.put(row0.link(), row0); + + assertNull(old); + } + + rows.clear(); + } + + continue; + } + list.insertDataRow(row, IoStatisticsHolderNoOp.INSTANCE); assertTrue(row.link() != 0L); - TestDataRow old = stored.put(row.link(), row); + CacheDataRow old = stored.put(row.link(), row); assertNull(old); } else { - Iterator<TestDataRow> it = stored.values().iterator(); + Iterator<CacheDataRow> it = stored.values().iterator(); if (it.hasNext()) { - TestDataRow row = it.next(); + CacheDataRow row = it.next(); - TestDataRow rmvd = stored.remove(row.link); + CacheDataRow rmvd = stored.remove(row.link()); assertTrue(rmvd == row); - list.removeDataRowByLink(row.link, IoStatisticsHolderNoOp.INSTANCE); + list.removeDataRowByLink(row.link(), IoStatisticsHolderNoOp.INSTANCE); } } } @@ -355,7 +503,7 @@ public class CacheFreeListSelfTest extends GridCommonAbstractTest { * @return Free list. * @throws Exception If failed. */ - protected FreeList createFreeList(int pageSize) throws Exception { + protected FreeList<CacheDataRow> createFreeList(int pageSize) throws Exception { DataRegionConfiguration plcCfg = new DataRegionConfiguration() .setInitialSize(1024 * MB) .setMaxSize(1024 * MB);