http://git-wip-us.apache.org/repos/asf/ignite/blob/37eed342/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
----------------------------------------------------------------------
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
index 5c66b10..6c51096 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.pagemem.Page;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
@@ -37,7 +36,6 @@ import
org.apache.ignite.internal.pagemem.wal.record.delta.PagesListInitNewPageR
import
org.apache.ignite.internal.pagemem.wal.record.delta.PagesListRemovePageRecord;
import
org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetNextRecord;
import
org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetPreviousRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.RecycleRecord;
import org.apache.ignite.internal.processors.cache.database.DataStructure;
import
org.apache.ignite.internal.processors.cache.database.freelist.io.PagesListMetaIO;
import
org.apache.ignite.internal.processors.cache.database.freelist.io.PagesListNodeIO;
@@ -58,9 +56,6 @@ import static java.lang.Boolean.TRUE;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
import static
org.apache.ignite.internal.processors.cache.database.tree.io.PageIO.getPageId;
-import static
org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler.initPage;
-import static
org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler.isWalDeltaRecordNeeded;
-import static
org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler.writePage;
/**
* Striped doubly-linked list of page IDs optionally organized in buckets.
@@ -93,13 +88,20 @@ public abstract class PagesList extends DataStructure {
/**
*
*/
- private class CutTail extends PageHandler<Void, Boolean> {
- /** {@inheritDoc} */
- @Override public Boolean run(Page page, PageIO pageIo, long pageAddr,
Void ignore, int bucket)
- throws IgniteCheckedException {
- assert getPageId(pageAddr) == page.id();
-
- PagesListNodeIO io = (PagesListNodeIO)pageIo;
+ private final class CutTail extends PageHandler<Void, Boolean> {
+ @Override
+ public Boolean run(
+ int cacheId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIO iox,
+ Boolean walPlc,
+ Void ignore,
+ int bucket) throws IgniteCheckedException {
+ assert getPageId(pageAddr) == pageId;
+
+ PagesListNodeIO io = (PagesListNodeIO)iox;
long tailId = io.getNextId(pageAddr);
@@ -107,10 +109,10 @@ public abstract class PagesList extends DataStructure {
io.setNextId(pageAddr, 0L);
- if (isWalDeltaRecordNeeded(wal, page))
- wal.log(new PagesListSetNextRecord(cacheId, page.id(), 0L));
+ if (needWalDeltaRecord(pageId, page, walPlc))
+ wal.log(new PagesListSetNextRecord(cacheId, pageId, 0L));
- updateTail(bucket, tailId, page.id());
+ updateTail(bucket, tailId, pageId);
return TRUE;
}
@@ -153,18 +155,19 @@ public abstract class PagesList extends DataStructure {
protected final void init(long metaPageId, boolean initNew) throws
IgniteCheckedException {
if (metaPageId != 0L) {
if (initNew) {
- try (Page page = page(metaPageId)) {
- initPage(pageMem, page, this,
PagesListMetaIO.VERSIONS.latest(), wal);
- }
+ init(metaPageId, PagesListMetaIO.VERSIONS.latest());
}
else {
Map<Integer, GridLongList> bucketsData = new HashMap<>();
- long nextPageId = metaPageId;
+ long nextId = metaPageId;
- while (nextPageId != 0) {
- try (Page page = page(nextPageId)) {
- long pageAddr = readLock(page); // No concurrent
recycling on init.
+ while (nextId != 0) {
+ final long pageId = nextId;
+ final long page = acquirePage(pageId);
+
+ try {
+ long pageAddr = readLock(pageId, page); // No
concurrent recycling on init.
assert pageAddr != 0L;
@@ -173,17 +176,20 @@ public abstract class PagesList extends DataStructure {
io.getBucketsData(pageAddr, bucketsData);
- long next0 = io.getNextMetaPageId(pageAddr);
+ nextId = io.getNextMetaPageId(pageAddr);
+
+ assert nextId != pageId :
+ "Loop detected [next=" + U.hexLong(nextId) +
", cur=" + U.hexLong(pageId) + ']';
- assert next0 != nextPageId :
- "Loop detected [next=" + U.hexLong(next0) + ",
cur=" + U.hexLong(nextPageId) + ']';
- nextPageId = next0;
}
finally {
- readUnlock(page, pageAddr);
+ readUnlock(pageId, page, pageAddr);
}
}
+ finally {
+ releasePage(pageId, page);
+ }
}
for (Map.Entry<Integer, GridLongList> e :
bucketsData.entrySet()) {
@@ -200,12 +206,14 @@ public abstract class PagesList extends DataStructure {
for (int i = 0; i < upd.length; i++) {
long tailId = upd[i];
- long pageId = tailId;
+ long prevId = tailId;
int cnt = 0;
- while (pageId != 0L) {
- try (Page page = page(pageId)) {
- long pageAddr = readLock(page);
+ while (prevId != 0L) {
+ final long pageId = prevId;
+ final long page = acquirePage(pageId);
+ try {
+ long pageAddr = readLock(pageId, page);
assert pageAddr != 0L;
@@ -213,16 +221,19 @@ public abstract class PagesList extends DataStructure {
PagesListNodeIO io =
PagesListNodeIO.VERSIONS.forPage(pageAddr);
cnt += io.getCount(pageAddr);
- pageId = io.getPreviousId(pageAddr);
+ prevId = io.getPreviousId(pageAddr);
// In reuse bucket the page itself can be
used as a free page.
- if (isReuseBucket(bucket) && pageId != 0L)
+ if (isReuseBucket(bucket) && prevId != 0L)
cnt++;
}
finally {
- readUnlock(page, pageAddr);
+ readUnlock(pageId, page, pageAddr);
}
}
+ finally {
+ releasePage(pageId, page);
+ }
}
Stripe stripe = new Stripe(tailId, cnt == 0);
@@ -245,11 +256,13 @@ public abstract class PagesList extends DataStructure {
public void saveMetadata() throws IgniteCheckedException {
assert metaPageId != 0;
- Page curPage = null;
- long curPageAddr = 0L;
+ long curId = 0L;
+ long cur = 0L;
+ long curAddr = 0L;
+
PagesListMetaIO curIo = null;
- long nextPageId = metaPageId;
+ long nextId = metaPageId;
try {
for (int bucket = 0; bucket < buckets; bucket++) {
@@ -259,39 +272,39 @@ public abstract class PagesList extends DataStructure {
int tailIdx = 0;
while (tailIdx < tails.length) {
- int written = curPage != null ?
curIo.addTails(pageMem.pageSize(), curPageAddr, bucket, tails, tailIdx) : 0;
+ int written = curAddr != 0L ?
curIo.addTails(pageMem.pageSize(), curAddr, bucket, tails, tailIdx) : 0;
if (written == 0) {
- if (nextPageId == 0L) {
- nextPageId = allocatePageNoReuse();
+ if (nextId == 0L) {
+ nextId = allocatePageNoReuse();
- if (curPage != null) {
- curIo.setNextMetaPageId(curPageAddr,
nextPageId);
+ if (curAddr != 0L) {
+ curIo.setNextMetaPageId(curAddr, nextId);
- releaseAndClose(curPage, curPageAddr);
- curPage = null;
+ releaseAndClose(curId, cur, curAddr);
}
- curPage = page(nextPageId);
- curPageAddr = writeLock(curPage);
+ curId = nextId;
+ cur = acquirePage(curId);
+ curAddr = writeLock(curId, cur);
curIo = PagesListMetaIO.VERSIONS.latest();
- curIo.initNewPage(curPageAddr, nextPageId,
pageSize());
+ curIo.initNewPage(curAddr, curId, pageSize());
}
else {
- releaseAndClose(curPage, curPageAddr);
- curPage = null;
+ releaseAndClose(curId, cur, curAddr);
- curPage = page(nextPageId);
- curPageAddr = writeLock(curPage);
+ curId = nextId;
+ cur = acquirePage(curId);
+ curAddr = writeLock(curId, cur);
- curIo =
PagesListMetaIO.VERSIONS.forPage(curPageAddr);
+ curIo =
PagesListMetaIO.VERSIONS.forPage(curAddr);
- curIo.resetCount(curPageAddr);
+ curIo.resetCount(curAddr);
}
- nextPageId = curIo.getNextMetaPageId(curPageAddr);
+ nextId = curIo.getNextMetaPageId(curAddr);
}
else
tailIdx += written;
@@ -300,44 +313,50 @@ public abstract class PagesList extends DataStructure {
}
}
finally {
- releaseAndClose(curPage, curPageAddr);
+ releaseAndClose(curId, cur, curAddr);
}
- while (nextPageId != 0L) {
- try (Page page = page(nextPageId)) {
- long pageAddr = writeLock(page);
+ while (nextId != 0L) {
+ long pageId = nextId;
+
+ long page = acquirePage(pageId);
+ try {
+ long pageAddr = writeLock(pageId, page);
try {
PagesListMetaIO io =
PagesListMetaIO.VERSIONS.forPage(pageAddr);
io.resetCount(pageAddr);
- if (PageHandler.isWalDeltaRecordNeeded(wal, page))
- wal.log(new PageListMetaResetCountRecord(cacheId,
nextPageId));
+ if (needWalDeltaRecord(pageId, page, null))
+ wal.log(new PageListMetaResetCountRecord(cacheId,
pageId));
- nextPageId = io.getNextMetaPageId(pageAddr);
+ nextId = io.getNextMetaPageId(pageAddr);
}
finally {
- writeUnlock(page, pageAddr, true);
+ writeUnlock(pageId, page, pageAddr, true);
}
}
+ finally {
+ releasePage(pageId, page);
+ }
}
}
/**
- * @param page Page.
- * @param buf Buffer.
+ * @param pageId Page ID.
+ * @param page Page absolute pointer.
+ * @param pageAddr Page address.
+ * @throws IgniteCheckedException If failed.
*/
- private void releaseAndClose(Page page, long buf) {
- if (page != null) {
+ private void releaseAndClose(long pageId, long page, long pageAddr) throws
IgniteCheckedException {
+ if (pageAddr != 0L) {
try {
// No special WAL record because we most likely changed the
whole page.
- page.fullPageWalRecordPolicy(true);
-
- writeUnlock(page, buf, true);
+ writeUnlock(pageId, page, pageAddr, TRUE, true);
}
finally {
- page.close();
+ releasePage(pageId, page);
}
}
}
@@ -389,9 +408,7 @@ public abstract class PagesList extends DataStructure {
private Stripe addStripe(int bucket, boolean reuse) throws
IgniteCheckedException {
long pageId = reuse ? allocatePage(null) : allocatePageNoReuse();
- try (Page page = page(pageId)) {
- initPage(pageMem, page, this, PagesListNodeIO.VERSIONS.latest(),
wal);
- }
+ init(pageId, PagesListNodeIO.VERSIONS.latest());
Stripe stripe = new Stripe(pageId, true);
@@ -518,28 +535,37 @@ public abstract class PagesList extends DataStructure {
if (tails != null) {
for (Stripe tail : tails) {
- long pageId = tail.tailId;
+ long tailId = tail.tailId;
- while (pageId != 0L) {
- try (Page page = page(pageId)) {
- long pageAddr = readLock(page);
+ while (tailId != 0L) {
+ final long pageId = tailId;
+ final long page = acquirePage(pageId);
+ try {
+ long pageAddr = readLock(pageId, page);
assert pageAddr != 0L;
try {
PagesListNodeIO io =
PagesListNodeIO.VERSIONS.forPage(pageAddr);
- res += io.getCount(pageAddr);
- pageId = io.getPreviousId(pageAddr);
+ int cnt = io.getCount(pageAddr);
+
+ assert cnt >= 0;
+
+ res += cnt;
+ tailId = io.getPreviousId(pageAddr);
// In reuse bucket the page itself can be used as
a free page.
- if (isReuseBucket(bucket) && pageId != 0L)
+ if (isReuseBucket(bucket) && tailId != 0L)
res++;
}
finally {
- readUnlock(page, pageAddr);
+ readUnlock(pageId, page, pageAddr);
}
}
+ finally {
+ releasePage(pageId, page);
+ }
}
}
}
@@ -551,48 +577,55 @@ public abstract class PagesList extends DataStructure {
/**
* @param bag Reuse bag.
- * @param dataPage Data page.
- * @param dataPageAddr Data page address.
+ * @param dataId Data page ID.
+ * @param dataPage Data page pointer.
+ * @param dataAddr Data page address.
* @param bucket Bucket.
* @throws IgniteCheckedException If failed.
*/
- protected final void put(ReuseBag bag, Page dataPage, long dataPageAddr,
int bucket)
+ protected final void put(
+ ReuseBag bag,
+ final long dataId,
+ final long dataPage,
+ final long dataAddr,
+ int bucket)
throws IgniteCheckedException {
- assert bag == null ^ dataPageAddr == 0L;
+ assert bag == null ^ dataAddr == 0L;
for (int lockAttempt = 0; ;) {
Stripe stripe = getPageForPut(bucket);
- long tailId = stripe.tailId;
+ final long tailId = stripe.tailId;
+ final long tailPage = acquirePage(tailId);
- try (Page tail = page(tailId)) {
- long pageAddr = writeLockPage(tail, bucket, lockAttempt++); //
Explicit check.
+ try {
+ long tailAddr = writeLockPage(tailId, tailPage, bucket,
lockAttempt++); // Explicit check.
- if (pageAddr == 0L) {
+ if (tailAddr == 0L) {
if (isReuseBucket(bucket) && lockAttempt ==
TRY_LOCK_ATTEMPTS)
addStripeForReuseBucket(bucket);
continue;
}
- assert PageIO.getPageId(pageAddr) == tailId : "pageId = " +
PageIO.getPageId(pageAddr) + ", tailId = " + tailId;
- assert PageIO.getType(pageAddr) == PageIO.T_PAGE_LIST_NODE;
+ assert PageIO.getPageId(tailAddr) == tailId : "pageId = " +
PageIO.getPageId(tailAddr) + ", tailId = " + tailId;
+ assert PageIO.getType(tailAddr) == PageIO.T_PAGE_LIST_NODE;
boolean ok = false;
try {
- PagesListNodeIO io = PageIO.getPageIO(pageAddr);
+ PagesListNodeIO io = PageIO.getPageIO(tailAddr);
ok = bag != null ?
// Here we can always take pages from the bag to build
our list.
- putReuseBag(tailId, tail, pageAddr, io, bag, bucket) :
+ putReuseBag(tailId, tailPage, tailAddr, io, bag,
bucket) :
// Here we can use the data page to build list only if
it is empty and
// it is being put into reuse bucket. Usually this
will be true, but there is
// a case when there is no reuse bucket in the free
list, but then deadlock
// on node page allocation from separate reuse list is
impossible.
// If the data page is not empty it can not be put
into reuse bucket and thus
// the deadlock is impossible as well.
- putDataPage(tailId, tail, pageAddr, io, dataPage,
dataPageAddr, bucket);
+ putDataPage(tailId, tailPage, tailAddr, io, dataId,
dataPage, dataAddr, bucket);
if (ok) {
stripe.empty = false;
@@ -601,52 +634,55 @@ public abstract class PagesList extends DataStructure {
}
}
finally {
- writeUnlock(tail, pageAddr, ok);
+ writeUnlock(tailId, tailPage, tailAddr, ok);
}
}
+ finally {
+ releasePage(tailId, tailPage);
+ }
}
}
/**
* @param pageId Page ID.
- * @param page Page.
+ * @param page Page pointer.
* @param pageAddr Page address.
* @param io IO.
- * @param dataPage Data page.
- * @param dataPageAddr Data page address.
+ * @param dataId Data page ID.
+ * @param dataPage Data page pointer.
+ * @param dataAddr Data page address.
* @param bucket Bucket.
* @return {@code true} If succeeded.
* @throws IgniteCheckedException If failed.
*/
private boolean putDataPage(
- long pageId,
- Page page,
- long pageAddr,
+ final long pageId,
+ final long page,
+ final long pageAddr,
PagesListNodeIO io,
- Page dataPage,
- long dataPageAddr,
+ final long dataId,
+ final long dataPage,
+ final long dataAddr,
int bucket
) throws IgniteCheckedException {
if (io.getNextId(pageAddr) != 0L)
return false; // Splitted.
- long dataPageId = dataPage.id();
-
- int idx = io.addPage(pageAddr, dataPageId, pageSize());
+ int idx = io.addPage(pageAddr, dataId, pageSize());
if (idx == -1)
- handlePageFull(pageId, page, pageAddr, io, dataPage, dataPageAddr,
bucket);
+ handlePageFull(pageId, page, pageAddr, io, dataId, dataPage,
dataAddr, bucket);
else {
bucketsSize[bucket].incrementAndGet();
- if (isWalDeltaRecordNeeded(wal, page))
- wal.log(new PagesListAddPageRecord(cacheId, pageId,
dataPageId));
+ if (needWalDeltaRecord(pageId, page, null))
+ wal.log(new PagesListAddPageRecord(cacheId, pageId, dataId));
- DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataPageAddr);
- dataIO.setFreeListPageId(dataPageAddr, pageId);
+ DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataAddr);
+ dataIO.setFreeListPageId(dataAddr, pageId);
- if (isWalDeltaRecordNeeded(wal, dataPage))
- wal.log(new DataPageSetFreeListPageRecord(cacheId,
dataPage.id(), pageId));
+ if (needWalDeltaRecord(dataId, dataPage, null))
+ wal.log(new DataPageSetFreeListPageRecord(cacheId, dataId,
pageId));
}
return true;
@@ -654,74 +690,76 @@ public abstract class PagesList extends DataStructure {
/**
* @param pageId Page ID.
- * @param page Page.
+ * @param page Page pointer.
* @param pageAddr Page address.
* @param io IO.
- * @param dataPage Data page.
- * @param dataPageAddr Data page address.
+ * @param dataId Data page ID.
+ * @param data Data page pointer.
+ * @param dataAddr Data page address.
* @param bucket Bucket index.
* @throws IgniteCheckedException If failed.
- */
+ * */
private void handlePageFull(
- long pageId,
- Page page,
- long pageAddr,
+ final long pageId,
+ final long page,
+ final long pageAddr,
PagesListNodeIO io,
- Page dataPage,
- long dataPageAddr,
+ final long dataId,
+ final long data,
+ final long dataAddr,
int bucket
) throws IgniteCheckedException {
- long dataPageId = dataPage.id();
- DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataPageAddr);
+ DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataAddr);
// Attempt to add page failed: the node page is full.
if (isReuseBucket(bucket)) {
// If we are on the reuse bucket, we can not allocate new page,
because it may cause deadlock.
- assert dataIO.isEmpty(dataPageAddr); // We can put only empty data
pages to reuse bucket.
+ assert dataIO.isEmpty(dataAddr); // We can put only empty data
pages to reuse bucket.
// Change page type to index and add it as next node page to this
list.
- dataPageId = PageIdUtils.changeType(dataPageId, FLAG_IDX);
+ long newDataId = PageIdUtils.changeType(dataId, FLAG_IDX);
- setupNextPage(io, pageId, pageAddr, dataPageId, dataPageAddr);
+ setupNextPage(io, pageId, pageAddr, newDataId, dataAddr);
- if (isWalDeltaRecordNeeded(wal, page))
- wal.log(new PagesListSetNextRecord(cacheId, pageId,
dataPageId));
+ if (needWalDeltaRecord(pageId, page, null))
+ wal.log(new PagesListSetNextRecord(cacheId, pageId,
newDataId));
- if (isWalDeltaRecordNeeded(wal, dataPage))
+ if (needWalDeltaRecord(dataId, data, null))
wal.log(new PagesListInitNewPageRecord(
cacheId,
- dataPageId,
+ dataId,
io.getType(),
io.getVersion(),
- dataPageId,
+ newDataId,
pageId, 0L));
// In reuse bucket the page itself can be used as a free page.
bucketsSize[bucket].incrementAndGet();
- updateTail(bucket, pageId, dataPageId);
+ updateTail(bucket, pageId, newDataId);
}
else {
// Just allocate a new node page and add our data page there.
- long nextId = allocatePage(null);
+ final long nextId = allocatePage(null);
+ final long nextPage = acquirePage(nextId);
- try (Page next = page(nextId)) {
- long nextPageAddr = writeLock(next); // Newly allocated page.
+ try {
+ long nextPageAddr = writeLock(nextId, nextPage); // Newly
allocated page.
assert nextPageAddr != 0L;
+ // Here we should never write full page, because it is known
to be new.
+ Boolean nextWalPlc = FALSE;
+
try {
setupNextPage(io, pageId, pageAddr, nextId, nextPageAddr);
- if (isWalDeltaRecordNeeded(wal, page))
+ if (needWalDeltaRecord(pageId, page, null))
wal.log(new PagesListSetNextRecord(cacheId, pageId,
nextId));
- int idx = io.addPage(nextPageAddr, dataPageId, pageSize());
+ int idx = io.addPage(nextPageAddr, dataId, pageSize());
- // Here we should never write full page, because it is
known to be new.
- next.fullPageWalRecordPolicy(FALSE);
-
- if (isWalDeltaRecordNeeded(wal, next))
+ if (needWalDeltaRecord(nextId, nextPage, nextWalPlc))
wal.log(new PagesListInitNewPageRecord(
cacheId,
nextId,
@@ -729,30 +767,33 @@ public abstract class PagesList extends DataStructure {
io.getVersion(),
nextId,
pageId,
- dataPageId
+ dataId
));
assert idx != -1;
- dataIO.setFreeListPageId(dataPageAddr, nextId);
+ dataIO.setFreeListPageId(dataAddr, nextId);
- if (isWalDeltaRecordNeeded(wal, dataPage))
- wal.log(new DataPageSetFreeListPageRecord(cacheId,
dataPageId, nextId));
+ if (needWalDeltaRecord(dataId, data, null))
+ wal.log(new DataPageSetFreeListPageRecord(cacheId,
dataId, nextId));
bucketsSize[bucket].incrementAndGet();
updateTail(bucket, pageId, nextId);
}
finally {
- writeUnlock(next, nextPageAddr, true);
+ writeUnlock(nextId, nextPage, nextPageAddr, nextWalPlc,
true);
}
}
+ finally {
+ releasePage(nextId, nextPage);
+ }
}
}
/**
* @param pageId Page ID.
- * @param page Page.
+ * @param page Page pointer.
* @param pageAddr Page address.
* @param io IO.
* @param bag Reuse bag.
@@ -763,7 +804,7 @@ public abstract class PagesList extends DataStructure {
@SuppressWarnings("ForLoopReplaceableByForEach")
private boolean putReuseBag(
final long pageId,
- Page page,
+ final long page,
final long pageAddr,
PagesListNodeIO io,
ReuseBag bag,
@@ -773,39 +814,41 @@ public abstract class PagesList extends DataStructure {
return false; // Splitted.
long nextId;
- long prevPageAddr = pageAddr;
+
long prevId = pageId;
+ long prevPage = page;
+ long prevAddr = pageAddr;
- List<Page> locked = null; // TODO may be unlock right away and do not
keep all these pages locked?
- List<Long> lockedAddrs = null;
+ Boolean walPlc = null;
+
+ List<long[]> locked = null; // TODO may be unlock right away and do
not keep all these pages locked?
try {
while ((nextId = bag.pollFreePage()) != 0L) {
- int idx = io.addPage(prevPageAddr, nextId, pageSize());
+ int idx = io.addPage(prevAddr, nextId, pageSize());
if (idx == -1) { // Attempt to add page failed: the node page
is full.
- try (Page next = page(nextId)) {
- long nextPageAddr = writeLock(next); // Page from
reuse bag can't be concurrently recycled.
+
+ final long nextPage = acquirePage(nextId);
+
+ try {
+ long nextPageAddr = writeLock(nextId, nextPage); //
Page from reuse bag can't be concurrently recycled.
assert nextPageAddr != 0L;
if (locked == null) {
- lockedAddrs = new ArrayList<>(2);
locked = new ArrayList<>(2);
}
- locked.add(next);
- lockedAddrs.add(nextPageAddr);
+ locked.add(new long[]{nextId, nextPage, nextPageAddr});
- setupNextPage(io, prevId, prevPageAddr, nextId,
nextPageAddr);
+ setupNextPage(io, prevId, prevAddr, nextId,
nextPageAddr);
- if (isWalDeltaRecordNeeded(wal, page))
+ if (needWalDeltaRecord(prevId, prevPage, walPlc))
wal.log(new PagesListSetNextRecord(cacheId,
prevId, nextId));
// Here we should never write full page, because it is
known to be new.
- next.fullPageWalRecordPolicy(FALSE);
-
- if (isWalDeltaRecordNeeded(wal, next))
+ if (needWalDeltaRecord(nextId, nextPage, walPlc =
FALSE))
wal.log(new PagesListInitNewPageRecord(
cacheId,
nextId,
@@ -822,14 +865,17 @@ public abstract class PagesList extends DataStructure {
// Switch to this new page, which is now a part of our
list
// to add the rest of the bag to the new page.
- prevPageAddr = nextPageAddr;
+ prevAddr = nextPageAddr;
prevId = nextId;
- page = next;
+ prevPage = nextPage;
+ }
+ finally {
+ releasePage(nextId, nextPage);
}
}
else {
// TODO: use single WAL record for bag?
- if (isWalDeltaRecordNeeded(wal, page))
+ if (needWalDeltaRecord(prevId, prevPage, walPlc))
wal.log(new PagesListAddPageRecord(cacheId, prevId,
nextId));
bucketsSize[bucket].incrementAndGet();
@@ -842,8 +888,10 @@ public abstract class PagesList extends DataStructure {
updateTail(bucket, pageId, prevId);
// Release write.
- for (int i = 0; i < locked.size(); i++)
- writeUnlock(locked.get(i), lockedAddrs.get(i), true);
+ for (int i = 0; i < locked.size(); i++) {
+ long[] vals = locked.get(i);
+ writeUnlock(vals[0], vals[1], vals[2], FALSE, true);
+ }
}
}
@@ -876,15 +924,16 @@ public abstract class PagesList extends DataStructure {
}
/**
- * @param page Page.
+ * @param pageId Page ID.
+ * @param page Page pointer.
* @param bucket Bucket.
* @param lockAttempt Lock attempts counter.
* @return Page address if page is locked of {@code null} if can retry
lock.
* @throws IgniteCheckedException If failed.
*/
- private long writeLockPage(Page page, int bucket, int lockAttempt)
+ private long writeLockPage(long pageId, long page, int bucket, int
lockAttempt)
throws IgniteCheckedException {
- long pageAddr = tryWriteLock(page);
+ long pageAddr = tryWriteLock(pageId, page);
if (pageAddr != 0L)
return pageAddr;
@@ -900,7 +949,7 @@ public abstract class PagesList extends DataStructure {
}
}
- return lockAttempt < TRY_LOCK_ATTEMPTS ? 0L : writeLock(page); // Must
be explicitly checked further.
+ return lockAttempt < TRY_LOCK_ATTEMPTS ? 0L : writeLock(pageId, page);
// Must be explicitly checked further.
}
/**
@@ -929,12 +978,13 @@ public abstract class PagesList extends DataStructure {
if (stripe == null)
return 0L;
- long tailId = stripe.tailId;
+ final long tailId = stripe.tailId;
+ final long tailPage = acquirePage(tailId);
- try (Page tail = page(tailId)) {
- long tailPageAddr = writeLockPage(tail, bucket,
lockAttempt++); // Explicit check.
+ try {
+ long tailAddr = writeLockPage(tailId, tailPage, bucket,
lockAttempt++); // Explicit check.
- if (tailPageAddr == 0L) {
+ if (tailAddr == 0L) {
if (isReuseBucket(bucket) && lockAttempt ==
TRY_LOCK_ATTEMPTS)
addStripeForReuseBucket(bucket);
@@ -943,7 +993,7 @@ public abstract class PagesList extends DataStructure {
if (stripe.empty) {
// Another thread took the last page.
- writeUnlock(tail, tailPageAddr, false);
+ writeUnlock(tailId, tailPage, tailAddr, false);
if (bucketsSize[bucket].get() > 0) {
lockAttempt--; // Ignore current attempt.
@@ -954,48 +1004,45 @@ public abstract class PagesList extends DataStructure {
return 0L;
}
- assert PageIO.getPageId(tailPageAddr) == tailId : "tailId = "
+ tailId + ", tailPageId = " + PageIO.getPageId(tailPageAddr);
- assert PageIO.getType(tailPageAddr) == PageIO.T_PAGE_LIST_NODE;
+ assert PageIO.getPageId(tailAddr) == tailId : "tailId = " +
tailId + ", tailPageId = " + PageIO.getPageId(tailAddr);
+ assert PageIO.getType(tailAddr) == PageIO.T_PAGE_LIST_NODE;
boolean dirty = false;
long ret;
long recycleId = 0L;
try {
- PagesListNodeIO io =
PagesListNodeIO.VERSIONS.forPage(tailPageAddr);
+ PagesListNodeIO io =
PagesListNodeIO.VERSIONS.forPage(tailAddr);
- if (io.getNextId(tailPageAddr) != 0) {
+ if (io.getNextId(tailAddr) != 0) {
// It is not a tail anymore, retry.
continue;
}
- long pageId = io.takeAnyPage(tailPageAddr);
+ long pageId = io.takeAnyPage(tailAddr);
if (pageId != 0L) {
bucketsSize[bucket].decrementAndGet();
- if (isWalDeltaRecordNeeded(wal, tail))
+ if (needWalDeltaRecord(tailId, tailPage, null))
wal.log(new PagesListRemovePageRecord(cacheId,
tailId, pageId));
dirty = true;
ret = pageId;
- if (io.isEmpty(tailPageAddr)) {
- long prevId = io.getPreviousId(tailPageAddr);
+ if (io.isEmpty(tailAddr)) {
+ long prevId = io.getPreviousId(tailAddr);
// If we got an empty page in non-reuse bucket,
move it back to reuse list
// to prevent empty page leak to data pages.
if (!isReuseBucket(bucket)) {
if (prevId != 0L) {
- try (Page prev = page(prevId)) {
- // Lock pages from next to previous.
- Boolean ok = writePage(pageMem, prev,
this, cutTail, null, bucket, FALSE);
+ Boolean ok = write(prevId, cutTail, null,
bucket, FALSE);
- assert ok == TRUE : ok;
- }
+ assert ok == TRUE : ok;
- recycleId = recyclePage(tailId, tail,
tailPageAddr);
+ recycleId = recyclePage(tailId, tailPage,
tailAddr, null);
}
else
stripe.empty = true;
@@ -1010,37 +1057,32 @@ public abstract class PagesList extends DataStructure {
// a previous page, so, the current page can be
collected
assert isReuseBucket(bucket);
- long prevId = io.getPreviousId(tailPageAddr);
+ long prevId = io.getPreviousId(tailAddr);
assert prevId != 0L;
- try (Page prev = page(prevId)) {
- // Lock pages from next to previous.
- Boolean ok = writePage(pageMem, prev, this,
cutTail, null, bucket, FALSE);
+ Boolean ok = write(prevId, cutTail, bucket, FALSE);
- assert ok == TRUE : ok;
+ assert ok == TRUE : ok;
- bucketsSize[bucket].decrementAndGet();
- }
+ bucketsSize[bucket].decrementAndGet();
if (initIoVers != null) {
- tailId = PageIdUtils.changeType(tailId, FLAG_DATA);
+ ret = PageIdUtils.changeType(tailId, FLAG_DATA);
PageIO initIo = initIoVers.latest();
- initIo.initNewPage(tailPageAddr, tailId,
pageSize());
+ initIo.initNewPage(tailAddr, ret, pageSize());
- if (isWalDeltaRecordNeeded(wal, tail)) {
- wal.log(new InitNewPageRecord(cacheId,
tail.id(), initIo.getType(),
- initIo.getVersion(), tailId));
+ if (needWalDeltaRecord(tailId, tailPage, null)) {
+ wal.log(new InitNewPageRecord(cacheId, tailId,
initIo.getType(),
+ initIo.getVersion(), ret));
}
}
else
- tailId = recyclePage(tailId, tail, tailPageAddr);
+ ret = recyclePage(tailId, tailPage, tailAddr,
null);
dirty = true;
-
- ret = tailId;
}
// If we do not have a previous page (we are at head),
then we still can return
@@ -1049,7 +1091,7 @@ public abstract class PagesList extends DataStructure {
// meta page.
}
finally {
- writeUnlock(tail, tailPageAddr, dirty);
+ writeUnlock(tailId, tailPage, tailAddr, dirty);
}
// Put recycled page (if any) to the reuse bucket after tail
is unlocked.
@@ -1061,31 +1103,39 @@ public abstract class PagesList extends DataStructure {
return ret;
}
+ finally {
+ releasePage(tailId, tailPage);
+ }
}
}
/**
- * @param dataPage Data page.
- * @param dataPageAddr Data page address.
+ * @param dataId Data page ID.
+ * @param dataPage Data page pointer.
+ * @param dataAddr Data page address.
* @param dataIO Data page IO.
* @param bucket Bucket index.
* @throws IgniteCheckedException If failed.
* @return {@code True} if page was removed.
*/
- protected final boolean removeDataPage(Page dataPage, long dataPageAddr,
DataPageIO dataIO, int bucket)
+ protected final boolean removeDataPage(
+ final long dataId,
+ final long dataPage,
+ final long dataAddr,
+ DataPageIO dataIO,
+ int bucket)
throws IgniteCheckedException {
- long dataPageId = dataPage.id();
-
- long pageId = dataIO.getFreeListPageId(dataPageAddr);
+ final long pageId = dataIO.getFreeListPageId(dataAddr);
assert pageId != 0;
- try (Page page = page(pageId)) {
+ final long page = acquirePage(pageId);
+ try {
long nextId;
long recycleId = 0L;
- long pageAddr = writeLock(page); // Explicit check.
+ long pageAddr = writeLock(pageId, page); // Explicit check.
if (pageAddr == 0L)
return false;
@@ -1095,21 +1145,21 @@ public abstract class PagesList extends DataStructure {
try {
PagesListNodeIO io =
PagesListNodeIO.VERSIONS.forPage(pageAddr);
- rmvd = io.removePage(pageAddr, dataPageId);
+ rmvd = io.removePage(pageAddr, dataId);
if (!rmvd)
return false;
bucketsSize[bucket].decrementAndGet();
- if (isWalDeltaRecordNeeded(wal, page))
- wal.log(new PagesListRemovePageRecord(cacheId, pageId,
dataPageId));
+ if (needWalDeltaRecord(pageId, page, null))
+ wal.log(new PagesListRemovePageRecord(cacheId, pageId,
dataId));
// Reset free list page ID.
- dataIO.setFreeListPageId(dataPageAddr, 0L);
+ dataIO.setFreeListPageId(dataAddr, 0L);
- if (isWalDeltaRecordNeeded(wal, dataPage))
- wal.log(new DataPageSetFreeListPageRecord(cacheId,
dataPageId, 0L));
+ if (needWalDeltaRecord(dataId, dataPage, null))
+ wal.log(new DataPageSetFreeListPageRecord(cacheId, dataId,
0L));
if (!io.isEmpty(pageAddr))
return true; // In optimistic case we still have something
in the page and can leave it as is.
@@ -1126,7 +1176,7 @@ public abstract class PagesList extends DataStructure {
}
}
finally {
- writeUnlock(page, pageAddr, rmvd);
+ writeUnlock(pageId, page, pageAddr, rmvd);
}
// Perform a fair merge after lock release (to have a correct
locking order).
@@ -1138,65 +1188,40 @@ public abstract class PagesList extends DataStructure {
return true;
}
- }
-
- /**
- * @param page Page.
- * @param pageId Page ID.
- * @param pageAddr Page address.
- * @param prevId Previous page ID.
- * @param bucket Bucket index.
- * @return Page ID to recycle.
- * @throws IgniteCheckedException If failed.
- */
- private long mergeNoNext(long pageId, Page page, long pageAddr, long
prevId, int bucket)
- throws IgniteCheckedException {
- // If we do not have a next page (we are tail) and we are on reuse
bucket,
- // then we can leave as is as well, because it is normal to have an
empty tail page here.
- if (isReuseBucket(bucket))
- return 0L;
-
- if (prevId != 0L) { // Cut tail if we have a previous page.
- try (Page prev = page(prevId)) {
- Boolean ok = writePage(pageMem, prev, this, cutTail, null,
bucket, FALSE);
-
- assert ok == TRUE: ok; // Because we keep lock on current tail
and do a world consistency check.
- }
- }
- else {
- // If we don't have a previous, then we are tail page of free
list, just drop the stripe.
- boolean rmvd = updateTail(bucket, pageId, 0L);
-
- if (!rmvd)
- return 0L;
+ finally {
+ releasePage(pageId, page);
}
-
- return recyclePage(pageId, page, pageAddr);
}
/**
* @param pageId Page ID.
- * @param page Page.
+ * @param page Page pointer.
* @param nextId Next page ID.
* @param bucket Bucket index.
* @return Page ID to recycle.
* @throws IgniteCheckedException If failed.
*/
- private long merge(long pageId, Page page, long nextId, int bucket)
+ private long merge(
+ final long pageId,
+ final long page,
+ long nextId,
+ int bucket)
throws IgniteCheckedException {
assert nextId != 0; // We should do mergeNoNext then.
// Lock all the pages in correct order (from next to previous) and do
the merge in retry loop.
for (;;) {
- try (Page next = nextId == 0L ? null : page(nextId)) {
+ final long curId = nextId;
+ final long curPage = curId == 0L ? 0L : acquirePage(curId);
+ try {
boolean write = false;
- long nextPageAddr = next == null ? 0L : writeLock(next); //
Explicit check.
- long pageAddr = writeLock(page); // Explicit check.
+ final long curAddr = curPage == 0L ? 0L : writeLock(curId,
curPage); // Explicit check.
+ final long pageAddr = writeLock(pageId, page); // Explicit
check.
if (pageAddr == 0L) {
- if (nextPageAddr != 0L) // Unlock next page if needed.
- writeUnlock(next, nextPageAddr, false);
+ if (curAddr != 0L) // Unlock next page if needed.
+ writeUnlock(curId, curPage, curAddr, false);
return 0L; // Someone has merged or taken our empty page
concurrently. Nothing to do here.
}
@@ -1208,8 +1233,8 @@ public abstract class PagesList extends DataStructure {
return 0L; // No need to merge anymore.
// Check if we see a consistent state of the world.
- if (io.getNextId(pageAddr) == nextId && (nextId == 0L) ==
(nextPageAddr == 0L)) {
- long recycleId = doMerge(pageId, page, pageAddr, io,
next, nextId, nextPageAddr, bucket);
+ if (io.getNextId(pageAddr) == curId && (curId == 0L) ==
(curAddr == 0L)) {
+ long recycleId = doMerge(pageId, page, pageAddr, io,
curId, curPage, curAddr, bucket);
write = true;
@@ -1220,35 +1245,76 @@ public abstract class PagesList extends DataStructure {
nextId = io.getNextId(pageAddr);
}
finally {
- if (nextPageAddr != 0L)
- writeUnlock(next, nextPageAddr, write);
+ if (curAddr != 0L)
+ writeUnlock(curId, curPage, curAddr, write);
- writeUnlock(page, pageAddr, write);
+ writeUnlock(pageId, page, pageAddr, write);
}
}
+ finally {
+ if(curPage != 0L)
+ releasePage(curId, curPage);
+ }
}
}
/**
- * @param page Page.
* @param pageId Page ID.
- * @param io IO.
+ * @param page Page pointer.
* @param pageAddr Page address.
- * @param next Next page.
+ * @param prevId Previous page ID.
+ * @param bucket Bucket index.
+ * @return Page ID to recycle.
+ * @throws IgniteCheckedException If failed.
+ */
+ private long mergeNoNext(
+ long pageId,
+ long page,
+ long pageAddr,
+ long prevId,
+ int bucket)
+ throws IgniteCheckedException {
+ // If we do not have a next page (we are tail) and we are on reuse
bucket,
+ // then we can leave as is as well, because it is normal to have an
empty tail page here.
+ if (isReuseBucket(bucket))
+ return 0L;
+
+ if (prevId != 0L) { // Cut tail if we have a previous page.
+ Boolean ok = write(prevId, cutTail, null, bucket, FALSE);
+
+ assert ok == TRUE: ok;
+ }
+ else {
+ // If we don't have a previous, then we are tail page of free
list, just drop the stripe.
+ boolean rmvd = updateTail(bucket, pageId, 0L);
+
+ if (!rmvd)
+ return 0L;
+ }
+
+ return recyclePage(pageId, page, pageAddr, null);
+ }
+
+ /**
+ * @param pageId Page ID.
+ * @param page Page absolute pointer.
+ * @param pageAddr Page address.
+ * @param io IO.
* @param nextId Next page ID.
- * @param nextPageAddr Next page address.
+ * @param nextPage Next page absolute pointer.
+ * @param nextAddr Next page address.
* @param bucket Bucket index.
* @return Page to recycle.
* @throws IgniteCheckedException If failed.
*/
private long doMerge(
long pageId,
- Page page,
+ long page,
long pageAddr,
PagesListNodeIO io,
- Page next,
long nextId,
- long nextPageAddr,
+ long nextPage,
+ long nextAddr,
int bucket
) throws IgniteCheckedException {
long prevId = io.getPreviousId(pageAddr);
@@ -1257,86 +1323,71 @@ public abstract class PagesList extends DataStructure {
return mergeNoNext(pageId, page, pageAddr, prevId, bucket);
else {
// No one must be able to merge it while we keep a reference.
- assert getPageId(nextPageAddr) == nextId;
+ assert getPageId(nextAddr) == nextId;
if (prevId == 0L) { // No previous page: we are at head.
// These references must be updated at the same time in write
locks.
- assert
PagesListNodeIO.VERSIONS.forPage(nextPageAddr).getPreviousId(nextPageAddr) ==
pageId;
+ assert
PagesListNodeIO.VERSIONS.forPage(nextAddr).getPreviousId(nextAddr) == pageId;
- PagesListNodeIO nextIO =
PagesListNodeIO.VERSIONS.forPage(nextPageAddr);
- nextIO.setPreviousId(nextPageAddr, 0);
+ PagesListNodeIO nextIO =
PagesListNodeIO.VERSIONS.forPage(nextAddr);
+ nextIO.setPreviousId(nextAddr, 0);
- if (isWalDeltaRecordNeeded(wal, next))
+ if (needWalDeltaRecord(nextId, nextPage, null))
wal.log(new PagesListSetPreviousRecord(cacheId, nextId,
0L));
}
else // Do a fair merge: link previous and next to each other.
- fairMerge(prevId, pageId, nextId, next, nextPageAddr);
+ fairMerge(prevId, pageId, nextId, nextPage, nextAddr);
- return recyclePage(pageId, page, pageAddr);
+ return recyclePage(pageId, page, pageAddr, null);
}
}
/**
* Link previous and next to each other.
- *
* @param prevId Previous Previous page ID.
* @param pageId Page ID.
- * @param next Next page.
* @param nextId Next page ID.
- * @param nextPageAddr Next page address.
+ * @param nextPage Next page absolute pointer.
+ * @param nextAddr Next page address.
* @throws IgniteCheckedException If failed.
*/
- private void fairMerge(long prevId,
+ private void fairMerge(
+ final long prevId,
long pageId,
long nextId,
- Page next,
- long nextPageAddr)
+ long nextPage,
+ long nextAddr)
throws IgniteCheckedException {
- try (Page prev = page(prevId)) {
- long prevPageAddr = writeLock(prev); // No check, we keep a
reference.
-
- assert prevPageAddr != 0L;
+ long prevPage = acquirePage(prevId);
+ try {
+ final long prevAddr = writeLock(prevId, prevPage); // No check, we
keep a reference.
+ assert prevAddr != 0L;
try {
- PagesListNodeIO prevIO =
PagesListNodeIO.VERSIONS.forPage(prevPageAddr);
- PagesListNodeIO nextIO =
PagesListNodeIO.VERSIONS.forPage(nextPageAddr);
+ PagesListNodeIO prevIO =
PagesListNodeIO.VERSIONS.forPage(prevAddr);
+ PagesListNodeIO nextIO =
PagesListNodeIO.VERSIONS.forPage(nextAddr);
// These references must be updated at the same time in write
locks.
- assert prevIO.getNextId(prevPageAddr) == pageId;
- assert nextIO.getPreviousId(nextPageAddr) == pageId;
+ assert prevIO.getNextId(prevAddr) == pageId;
+ assert nextIO.getPreviousId(nextAddr) == pageId;
- prevIO.setNextId(prevPageAddr, nextId);
+ prevIO.setNextId(prevAddr, nextId);
- if (isWalDeltaRecordNeeded(wal, prev))
+ if (needWalDeltaRecord(prevId, prevPage, null))
wal.log(new PagesListSetNextRecord(cacheId, prevId,
nextId));
- nextIO.setPreviousId(nextPageAddr, prevId);
+ nextIO.setPreviousId(nextAddr, prevId);
- if (isWalDeltaRecordNeeded(wal, next))
+ if (needWalDeltaRecord(nextId, nextPage, null))
wal.log(new PagesListSetPreviousRecord(cacheId, nextId,
prevId));
}
finally {
- writeUnlock(prev, prevPageAddr, true);
+ writeUnlock(prevId, prevPage, prevAddr, true);
}
}
- }
-
- /**
- * @param page Page.
- * @param pageId Page ID.
- * @param pageAddr Page address.
- * @return Rotated page ID.
- * @throws IgniteCheckedException If failed.
- */
- private long recyclePage(long pageId, Page page, long pageAddr) throws
IgniteCheckedException {
- pageId = PageIdUtils.rotatePageId(pageId);
-
- PageIO.setPageId(pageAddr, pageId);
-
- if (isWalDeltaRecordNeeded(wal, page))
- wal.log(new RecycleRecord(cacheId, page.id(), pageId));
-
- return pageId;
+ finally {
+ releasePage(prevId, prevPage);
+ }
}
/**