[NO ISSUE] Report all BufferCache write failures. - user model changes: no - storage format changes: no - interface changes: yes + IPageWriteFailureCallback: used to notify async IO caller when something goes wrong.
Details: - Before this change, it is possible for failures to be lost and for bulkload operations to not be aware of failure to write some pages. This can be dangerous. - To avoid this, when sending a page to be written a PageWriteFailureCallback is associated with the page to notify the caller that a failure took place. Change-Id: I97fd3dccff85dab84d644359be6f66b15ee708ef Reviewed-on: https://asterix-gerrit.ics.uci.edu/2787 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Luo Chen <cl...@uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/7c72a503 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/7c72a503 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/7c72a503 Branch: refs/heads/master Commit: 7c72a503d18cd870dd18fe61e27fb04c7a18f442 Parents: 47ab031 Author: Abdullah Alamoudi <bamou...@gmail.com> Authored: Mon Jul 23 14:31:23 2018 -0700 Committer: abdullah alamoudi <bamou...@gmail.com> Committed: Tue Jul 24 11:13:31 2018 -0700 ---------------------------------------------------------------------- .../am/bloomfilter/impls/BloomFilter.java | 32 +++++++++--- .../hyracks/storage/am/btree/impls/BTree.java | 12 ++--- .../storage/am/common/api/IPageManager.java | 11 ++++- .../AppendOnlyLinkedMetadataPageManager.java | 7 ++- .../freepage/LinkedMetaDataPageManager.java | 8 ++- .../am/common/impls/AbstractTreeIndex.java | 38 +++------------ .../am/lsm/btree/impls/ExternalBTree.java | 25 ++++++++-- .../lsm/btree/impls/ExternalBTreeWithBuddy.java | 25 ++++++++-- ...AbstractLSMWithBloomFilterDiskComponent.java | 5 +- .../api/AbstractLSMWithBuddyDiskComponent.java | 7 +-- .../am/lsm/common/api/ILSMDiskComponent.java | 5 +- .../storage/am/lsm/common/api/ILSMHarness.java | 6 +-- .../am/lsm/common/api/ILSMIOOperation.java | 4 +- .../common/freepage/VirtualFreePageManager.java | 3 +- .../lsm/common/impls/AbstractIoOperation.java | 13 ++++- .../common/impls/AbstractLSMDiskComponent.java | 5 +- .../lsm/common/impls/BloomFilterBulkLoader.java | 16 ++++++ .../ChainedLSMDiskComponentBulkLoader.java | 43 ++++++++++++++--- .../am/lsm/common/impls/EmptyComponent.java | 3 +- .../lsm/common/impls/ExternalIndexHarness.java | 13 ++++- .../am/lsm/common/impls/FilterBulkLoader.java | 16 ++++++ .../impls/IChainedComponentBulkLoader.java | 3 +- .../common/impls/IndexWithBuddyBulkLoader.java | 21 ++++++++ .../storage/am/lsm/common/impls/LSMHarness.java | 15 ++++-- .../am/lsm/common/impls/LSMIndexBulkLoader.java | 16 ++++++ .../impls/LSMIndexDiskComponentBulkLoader.java | 18 ++++++- .../am/lsm/common/impls/NoOpIoOperation.java | 11 +++++ .../am/lsm/common/impls/TracedIOOperation.java | 11 +++++ .../am/lsm/common/util/ComponentUtils.java | 11 +++-- .../impls/LSMInvertedIndexDiskComponent.java | 15 ++++-- .../ondisk/OnDiskInvertedIndex.java | 11 +++-- .../am/lsm/rtree/impls/ExternalRTree.java | 25 ++++++++-- .../hyracks/storage/am/rtree/impls/RTree.java | 10 ++-- .../hyracks/hyracks-storage-common/pom.xml | 5 ++ .../storage/common/IIndexBulkLoader.java | 4 +- .../buffercache/AsyncFIFOPageQueueManager.java | 51 ++++++++++++-------- .../storage/common/buffercache/CachedPage.java | 29 +++++++++-- .../common/buffercache/FIFOLocalWriter.java | 28 ++++++----- .../buffercache/HaltOnFailureCallback.java | 44 +++++++++++++++++ .../storage/common/buffercache/ICachedPage.java | 4 ++ .../common/buffercache/ICachedPageInternal.java | 11 +++-- .../common/buffercache/IFIFOPageQueue.java | 17 ++++++- .../common/buffercache/IFIFOPageWriter.java | 7 +-- .../buffercache/IPageWriteFailureCallback.java | 43 +++++++++++++++++ .../buffercache/PageWriteFailureCallback.java | 42 ++++++++++++++++ .../storage/common/buffercache/VirtualPage.java | 10 ++++ .../am/btree/DiskBTreeSearchCursorTest.java | 3 ++ .../hyracks/storage/common/BufferCacheTest.java | 4 +- 48 files changed, 610 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java index 6c16bd1..2dce1cd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-bloomfilter/src/main/java/org/apache/hyracks/storage/am/bloomfilter/impls/BloomFilter.java @@ -29,6 +29,7 @@ import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue; +import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback; import org.apache.hyracks.storage.common.file.BufferedFileHandle; public class BloomFilter { @@ -275,7 +276,7 @@ public class BloomFilter { return new BloomFilterBuilder(numElements, numHashes, numBitsPerElement); } - public class BloomFilterBuilder implements IIndexBulkLoader { + public class BloomFilterBuilder extends PageWriteFailureCallback implements IIndexBulkLoader { private final long[] hashes = BloomFilter.createHashArray(); private final long estimatedNumElements; private final int numHashes; @@ -286,6 +287,7 @@ public class BloomFilter { private final ICachedPage[] pages; private ICachedPage metaDataPage = null; + @SuppressWarnings("squid:S1181") // Catch Throwable Must return all confiscated pages public BloomFilterBuilder(long estimatedNumElemenets, int numHashes, int numBitsPerElement) throws HyracksDataException { if (!isActivated) { @@ -303,11 +305,22 @@ public class BloomFilter { actualNumElements = 0; pages = new ICachedPage[numPages]; int currentPageId = 1; - while (currentPageId <= numPages) { - ICachedPage page = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId)); - initPage(page.getBuffer().array()); - pages[currentPageId - 1] = page; - ++currentPageId; + try { + while (currentPageId <= numPages) { + ICachedPage page = + bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId)); + initPage(page.getBuffer().array()); + pages[currentPageId - 1] = page; + ++currentPageId; + } + } catch (Throwable th) { + // return confiscated pages + for (int i = 0; i < currentPageId; i++) { + if (pages[i] != null) { + bufferCache.returnPage(pages[i]); + } + } + throw th; } } @@ -364,11 +377,14 @@ public class BloomFilter { @Override public void end() throws HyracksDataException { allocateAndInitMetaDataPage(); - queue.put(metaDataPage); + queue.put(metaDataPage, this); for (ICachedPage p : pages) { - queue.put(p); + queue.put(p, this); } bufferCache.finishQueue(); + if (hasFailed()) { + throw HyracksDataException.create(getFailure()); + } BloomFilter.this.numBits = numBits; BloomFilter.this.numHashes = numHashes; BloomFilter.this.numElements = actualNumElements; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java index fb8770e..4fc8af9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java @@ -1012,9 +1012,7 @@ public class BTree extends AbstractTreeIndex { try { int tupleSize = Math.max(leafFrame.getBytesRequiredToWriteTuple(tuple), interiorFrame.getBytesRequiredToWriteTuple(tuple)); - NodeFrontier leafFrontier = nodeFrontiers.get(0); - int spaceNeeded = tupleWriter.bytesRequired(tuple) + slotSize; int spaceUsed = leafFrame.getBuffer().capacity() - leafFrame.getTotalFreeSpace(); @@ -1045,12 +1043,11 @@ public class BTree extends AbstractTreeIndex { ((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId); - queue.put(leafFrontier.page); + queue.put(leafFrontier.page, this); for (ICachedPage c : pagesToWrite) { - queue.put(c); + queue.put(c, this); } pagesToWrite.clear(); - splitKey.setRightPage(leafFrontier.pageId); } if (tupleSize > maxTupleSize) { @@ -1155,7 +1152,7 @@ public class BTree extends AbstractTreeIndex { ICachedPage lastLeaf = nodeFrontiers.get(level).page; int lastLeafPage = nodeFrontiers.get(level).pageId; lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), nodeFrontiers.get(level).pageId)); - queue.put(lastLeaf); + queue.put(lastLeaf, this); nodeFrontiers.get(level).page = null; persistFrontiers(level + 1, lastLeafPage); return; @@ -1170,9 +1167,8 @@ public class BTree extends AbstractTreeIndex { ((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage); int finalPageId = freePageManager.takePage(metaFrame); frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId)); - queue.put(frontier.page); + queue.put(frontier.page, this); frontier.pageId = finalPageId; - persistFrontiers(level + 1, finalPageId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java index b7987f8..f8a929d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IPageManager.java @@ -19,6 +19,7 @@ package org.apache.hyracks.storage.am.common.api; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; public interface IPageManager { @@ -42,12 +43,14 @@ public interface IPageManager { * 3. When we need to have a persisted state. * * Note: This method will not force indexes to disk driver using fsync + * * @throws HyracksDataException */ - void close() throws HyracksDataException; + void close(IPageWriteFailureCallback callback) throws HyracksDataException; /** * Create a metadata frame to be used for reading and writing to metadata pages + * * @return a new metadata frame */ ITreeIndexMetadataFrame createMetadataFrame(); @@ -87,6 +90,7 @@ public interface IPageManager { /** * Get the location of a block of free pages to use for index operations * This is used for records that are larger than a normal page + * * @param frame * A metadata frame to use to wrap metadata pages * @return The starting page location, or -1 if a block of free pages could be found or allocated @@ -107,6 +111,7 @@ public interface IPageManager { /** * Add a page back to the pool of free pages within an index file + * * @param frame * A metadata frame to use to wrap metadata pages * @param page @@ -129,6 +134,7 @@ public interface IPageManager { /** * Check whether the index is empty or not. + * * @param frame * interior frame * @param rootPage @@ -140,6 +146,7 @@ public interface IPageManager { /** * Get the root page of the id + * * @return the root page * @throws HyracksDataException */ @@ -147,6 +154,7 @@ public interface IPageManager { /** * Get the first page to start the bulk load + * * @return * @throws HyracksDataException */ @@ -154,6 +162,7 @@ public interface IPageManager { /** * Set the root page id and finalize the bulk load operation + * * @param rootPage * @throws HyracksDataException */ http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java index 5c389d2..97e7ed7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java @@ -32,6 +32,7 @@ import org.apache.hyracks.storage.common.buffercache.BufferCache; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; import org.apache.hyracks.storage.common.file.BufferedFileHandle; public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager { @@ -207,7 +208,7 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager } @Override - public void close() throws HyracksDataException { + public void close(IPageWriteFailureCallback callback) throws HyracksDataException { if (ready) { IFIFOPageQueue queue = bufferCache.createFIFOQueue(); ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame(); @@ -220,7 +221,9 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager } int finalMetaPage = getMaxPageId(metaFrame) + 1; confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage)); - queue.put(confiscatedPage); + // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page + // won't be flushed to disk because it won't be dirty until the write latch has been released. + queue.put(confiscatedPage, callback); bufferCache.finishQueue(); metadataPage = getMetadataPageId(); ready = false; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java index 951d824..e348e24 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java @@ -28,8 +28,14 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame; import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrameFactory; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; import org.apache.hyracks.storage.common.file.BufferedFileHandle; +/** + * @deprecated + * This class must not be used. Instead, use {@link AppendOnlyLinkedMetadataPageManager} + */ +@Deprecated public class LinkedMetaDataPageManager implements IMetadataPageManager { private final IBufferCache bufferCache; private int fileId = -1; @@ -238,7 +244,7 @@ public class LinkedMetaDataPageManager implements IMetadataPageManager { } @Override - public void close() throws HyracksDataException { + public void close(IPageWriteFailureCallback callback) throws HyracksDataException { if (ready) { ICachedPage metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId()), false); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java index 905c99d..b77f14f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java @@ -26,19 +26,19 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IPageManager; import org.apache.hyracks.storage.am.common.api.ITreeIndex; -import org.apache.hyracks.storage.am.common.api.ITreeIndexAccessor; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame; import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter; import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.MultiComparator; +import org.apache.hyracks.storage.common.buffercache.HaltOnFailureCallback; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue; +import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback; import org.apache.hyracks.storage.common.file.BufferedFileHandle; public abstract class AbstractTreeIndex implements ITreeIndex { @@ -95,7 +95,7 @@ public abstract class AbstractTreeIndex implements ITreeIndex { freePageManager.open(fileId); freePageManager.init(interiorFrameFactory, leafFrameFactory); setRootPage(); - freePageManager.close(); + freePageManager.close(HaltOnFailureCallback.INSTANCE); failed = false; } finally { bufferCache.closeFile(fileId); @@ -132,7 +132,7 @@ public abstract class AbstractTreeIndex implements ITreeIndex { if (!isActive) { throw HyracksDataException.create(ErrorCode.CANNOT_DEACTIVATE_INACTIVE_INDEX); } - freePageManager.close(); + freePageManager.close(HaltOnFailureCallback.INSTANCE); bufferCache.closeFile(fileId); isActive = false; } @@ -227,7 +227,7 @@ public abstract class AbstractTreeIndex implements ITreeIndex { return fieldCount; } - public abstract class AbstractTreeIndexBulkLoader implements IIndexBulkLoader { + public abstract class AbstractTreeIndexBulkLoader extends PageWriteFailureCallback implements IIndexBulkLoader { protected final MultiComparator cmp; protected final int slotSize; protected final int leafMaxBytes; @@ -297,6 +297,9 @@ public abstract class AbstractTreeIndex implements ITreeIndex { @Override public void end() throws HyracksDataException { bufferCache.finishQueue(); + if (hasFailed()) { + throw HyracksDataException.create(getFailure()); + } freePageManager.setRootPageId(rootPage); } @@ -317,31 +320,6 @@ public abstract class AbstractTreeIndex implements ITreeIndex { public void setLeafFrame(ITreeIndexFrame leafFrame) { this.leafFrame = leafFrame; } - - } - - public class TreeIndexInsertBulkLoader implements IIndexBulkLoader { - ITreeIndexAccessor accessor; - - public TreeIndexInsertBulkLoader() throws HyracksDataException { - accessor = (ITreeIndexAccessor) createAccessor(NoOpIndexAccessParameters.INSTANCE); - } - - @Override - public void add(ITupleReference tuple) throws HyracksDataException { - accessor.insert(tuple); - } - - @Override - public void end() throws HyracksDataException { - // do nothing - } - - @Override - public void abort() { - - } - } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java index a504f7e..8be75be 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTree.java @@ -60,6 +60,7 @@ import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.util.trace.ITracer; /** @@ -462,12 +463,15 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { if (isTransaction) { // Since this is a transaction component, validate and // deactivate. it could later be added or deleted - component.markAsValid(durable); - ioOpCallback.afterFinalize(loadOp); + try { + component.markAsValid(durable, loadOp); + } finally { + ioOpCallback.afterFinalize(loadOp); + } component.deactivate(); } else { ioOpCallback.afterFinalize(loadOp); - getHarness().addBulkLoadedComponent(component); + getHarness().addBulkLoadedComponent(loadOp); } } } finally { @@ -490,6 +494,21 @@ public class ExternalBTree extends LSMBTree implements ITwoPCIndex { ioOpCallback.completed(loadOp); } } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFailed() { + return componentBulkLoader.hasFailed(); + } + + @Override + public Throwable getFailure() { + return componentBulkLoader.getFailure(); + } } // The accessor for disk only indexes don't use modification callback and always carry the target index version with them http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java index b727a39..5bcf30d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddy.java @@ -65,6 +65,7 @@ import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.util.trace.ITracer; public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeIndex, ITwoPCIndex { @@ -544,12 +545,15 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd if (isTransaction) { // Since this is a transaction component, validate and // deactivate. it could later be added or deleted - component.markAsValid(durable); - ioOpCallback.afterFinalize(loadOp); + try { + component.markAsValid(durable, loadOp); + } finally { + ioOpCallback.afterFinalize(loadOp); + } component.deactivate(); } else { ioOpCallback.afterFinalize(loadOp); - getHarness().addBulkLoadedComponent(component); + getHarness().addBulkLoadedComponent(loadOp); } } } finally { @@ -574,6 +578,21 @@ public class ExternalBTreeWithBuddy extends AbstractLSMIndex implements ITreeInd ioOpCallback.completed(loadOp); } } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFailed() { + return componentBulkLoader.hasFailed(); + } + + @Override + public Throwable getFailure() { + return componentBulkLoader.getFailure(); + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java index 329a54b..6b6e5e0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBloomFilterDiskComponent.java @@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.ChainedLSMDiskComponentBul import org.apache.hyracks.storage.am.lsm.common.impls.IChainedComponentBulkLoader; import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; public abstract class AbstractLSMWithBloomFilterDiskComponent extends AbstractLSMDiskComponent { public AbstractLSMWithBloomFilterDiskComponent(AbstractLSMIndex lsmIndex, IMetadataPageManager mdPageManager, @@ -42,11 +43,11 @@ public abstract class AbstractLSMWithBloomFilterDiskComponent extends AbstractLS public abstract IBufferCache getBloomFilterBufferCache(); @Override - public void markAsValid(boolean persist) throws HyracksDataException { + public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException { // The order of forcing the dirty page to be flushed is critical. The // bloom filter must be always done first. ComponentUtils.markAsValid(getBloomFilterBufferCache(), getBloomFilter(), persist); - super.markAsValid(persist); + super.markAsValid(persist, callback); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java index cace9e5..f7feb78 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/AbstractLSMWithBuddyDiskComponent.java @@ -26,6 +26,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.IChainedComponentBulkLoade import org.apache.hyracks.storage.am.lsm.common.impls.IndexWithBuddyBulkLoader; import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; import org.apache.hyracks.storage.common.IIndexBulkLoader; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; public abstract class AbstractLSMWithBuddyDiskComponent extends AbstractLSMWithBloomFilterDiskComponent { @@ -37,9 +38,9 @@ public abstract class AbstractLSMWithBuddyDiskComponent extends AbstractLSMWithB public abstract AbstractTreeIndex getBuddyIndex(); @Override - public void markAsValid(boolean persist) throws HyracksDataException { - super.markAsValid(persist); - ComponentUtils.markAsValid(getBuddyIndex(), persist); + public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException { + super.markAsValid(persist, callback); + ComponentUtils.markAsValid(getBuddyIndex(), persist, callback); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java index 1500f37..543779c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMDiskComponent.java @@ -26,6 +26,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.ChainedLSMDiskComponentBulkLoader; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; import org.apache.hyracks.storage.am.lsm.common.impls.IChainedComponentBulkLoader; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; public interface ILSMDiskComponent extends ILSMComponent { @@ -68,9 +69,11 @@ public interface ILSMDiskComponent extends ILSMComponent { * * @param persist * whether the call should force data to disk before returning + * @param callback + * callback for when a page write operation fails * @throws HyracksDataException */ - void markAsValid(boolean persist) throws HyracksDataException; + void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException; /** * Activates the component http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index c4a0352..9e8c568 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -150,12 +150,12 @@ public interface ILSMHarness { /** * Add bulk loaded component * - * @param index - * the new component + * @param ioOperation + * the io operation that added the new component * @throws HyracksDataException * @throws IndexException */ - void addBulkLoadedComponent(ILSMDiskComponent index) throws HyracksDataException; + void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException; /** * Get index operation tracker http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java index 3245455..0e13933 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java @@ -26,8 +26,9 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; -public interface ILSMIOOperation extends Callable<LSMIOOperationStatus> { +public interface ILSMIOOperation extends Callable<LSMIOOperationStatus>, IPageWriteFailureCallback { /** * Represents the io operation type @@ -94,6 +95,7 @@ public interface ILSMIOOperation extends Callable<LSMIOOperationStatus> { /** * @return the failure in the io operation if any, null otherwise */ + @Override Throwable getFailure(); /** http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java index 08c75dc..9d62c0d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/freepage/VirtualFreePageManager.java @@ -28,6 +28,7 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; import org.apache.hyracks.storage.common.file.BufferedFileHandle; public class VirtualFreePageManager implements IPageManager { @@ -88,7 +89,7 @@ public class VirtualFreePageManager implements IPageManager { } @Override - public void close() { + public void close(IPageWriteFailureCallback callback) { // Method doesn't make sense for this free page manager. } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java index fc9a362..3d76755 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractIoOperation.java @@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; public abstract class AbstractIoOperation implements ILSMIOOperation { @@ -37,7 +38,7 @@ public abstract class AbstractIoOperation implements ILSMIOOperation { protected final FileReference target; protected final ILSMIOOperationCallback callback; protected final String indexIdentifier; - private Throwable failure; + private volatile Throwable failure; private LSMIOOperationStatus status = LSMIOOperationStatus.SUCCESS; private ILSMDiskComponent newComponent; private boolean completed = false; @@ -146,4 +147,14 @@ public abstract class AbstractIoOperation implements ILSMIOOperation { completeListeners.add(listener); } } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + setFailure(failure); + } + + @Override + public boolean hasFailed() { + return status == LSMIOOperationStatus.FAILURE; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java index aa312fb..a88a19a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils; import org.apache.hyracks.storage.common.MultiComparator; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -156,8 +157,8 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl * @throws HyracksDataException */ @Override - public void markAsValid(boolean persist) throws HyracksDataException { - ComponentUtils.markAsValid(getMetadataHolder(), persist); + public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException { + ComponentUtils.markAsValid(getMetadataHolder(), persist, callback); if (LOGGER.isInfoEnabled()) { LOGGER.log(Level.INFO, "Marked as valid component with id: " + getId()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java index 29ca388..a9c70e0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BloomFilterBulkLoader.java @@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.common.IIndexBulkLoader; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; public class BloomFilterBulkLoader implements IChainedComponentBulkLoader { @@ -65,4 +66,19 @@ public class BloomFilterBulkLoader implements IChainedComponentBulkLoader { endedBloomFilterLoad = true; } } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFailed() { + return bulkLoader.hasFailed(); + } + + @Override + public Throwable getFailure() { + return bulkLoader.getFailure(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java index 6e0606a..ab59b59 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java @@ -27,6 +27,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.util.annotations.CriticalPath; /** @@ -96,8 +97,9 @@ public class ChainedLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkL public void cleanupArtifacts() throws HyracksDataException { if (!cleanedUpArtifacts) { cleanedUpArtifacts = true; - for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) { - lsmOperation.cleanupArtifacts(); + final int bulkloadersCount = bulkloaderChain.size(); + for (int i = 0; i < bulkloadersCount; i++) { + bulkloaderChain.get(i).cleanupArtifacts();; } } diskComponent.deactivateAndDestroy(); @@ -106,8 +108,9 @@ public class ChainedLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkL @Override public void end() throws HyracksDataException { if (!cleanedUpArtifacts) { - for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) { - lsmOperation.end(); + final int bulkloadersCount = bulkloaderChain.size(); + for (int i = 0; i < bulkloadersCount; i++) { + bulkloaderChain.get(i).end(); } if (isEmptyComponent && cleanupEmptyComponent) { cleanupArtifacts(); @@ -118,8 +121,9 @@ public class ChainedLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkL @Override public void abort() throws HyracksDataException { operation.setStatus(LSMIOOperationStatus.FAILURE); - for (IChainedComponentBulkLoader lsmOperation : bulkloaderChain) { - lsmOperation.abort(); + final int bulkloadersCount = bulkloaderChain.size(); + for (int i = 0; i < bulkloadersCount; i++) { + bulkloaderChain.get(i).abort(); } } @@ -127,4 +131,31 @@ public class ChainedLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkL public ILSMIOOperation getOperation() { return operation; } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFailed() { + final int bulkloadersCount = bulkloaderChain.size(); + for (int i = 0; i < bulkloadersCount; i++) { + if (bulkloaderChain.get(i).hasFailed()) { + return true; + } + } + return false; + } + + @Override + public Throwable getFailure() { + final int bulkloadersCount = bulkloaderChain.size(); + for (int i = 0; i < bulkloadersCount; i++) { + if (bulkloaderChain.get(i).hasFailed()) { + return bulkloaderChain.get(i).getFailure(); + } + } + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java index 4c2ddb6..a2d9fdd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java @@ -31,6 +31,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; public class EmptyComponent implements ILSMDiskComponent { public static final EmptyComponent INSTANCE = new EmptyComponent(); @@ -105,7 +106,7 @@ public class EmptyComponent implements ILSMDiskComponent { } @Override - public void markAsValid(boolean persist) throws HyracksDataException { + public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException { // No Op } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java index ab70ba1..854e541 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java @@ -192,9 +192,18 @@ public class ExternalIndexHarness extends LSMHarness { } } + @SuppressWarnings("squid:S1181") @Override - public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException { - c.markAsValid(lsmIndex.isDurable()); + public void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException { + ILSMDiskComponent c = ioOperation.getNewComponent(); + try { + c.markAsValid(lsmIndex.isDurable(), ioOperation); + } catch (Throwable th) { + ioOperation.setFailure(th); + } + if (ioOperation.hasFailed()) { + throw HyracksDataException.create(ioOperation.getFailure()); + } synchronized (opTracker) { lsmIndex.addDiskComponent(c); if (replicationEnabled) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java index 43d2b0d..880f5be 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java @@ -26,6 +26,7 @@ import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; import org.apache.hyracks.storage.common.MultiComparator; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; public class FilterBulkLoader implements IChainedComponentBulkLoader { @@ -79,4 +80,19 @@ public class FilterBulkLoader implements IChainedComponentBulkLoader { filterTuple.reset(tuple); filter.update(filterTuple, filterCmp, NoOpOperationCallback.INSTANCE); } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFailed() { + return false; + } + + @Override + public Throwable getFailure() { + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java index 90ef127..1361c79 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IChainedComponentBulkLoader.java @@ -20,8 +20,9 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; -public interface IChainedComponentBulkLoader { +public interface IChainedComponentBulkLoader extends IPageWriteFailureCallback { /** * Adds a tuple to the bulkloaded component * http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java index 4fb2919..394126d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexWithBuddyBulkLoader.java @@ -22,6 +22,7 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.common.IIndexBulkLoader; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; public class IndexWithBuddyBulkLoader implements IChainedComponentBulkLoader { @@ -69,4 +70,24 @@ public class IndexWithBuddyBulkLoader implements IChainedComponentBulkLoader { bulkLoader.abort(); buddyBTreeBulkLoader.abort(); } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFailed() { + return bulkLoader.hasFailed() || buddyBTreeBulkLoader.hasFailed(); + } + + @Override + public Throwable getFailure() { + if (bulkLoader.hasFailed()) { + return bulkLoader.getFailure(); + } else if (buddyBTreeBulkLoader.hasFailed()) { + return buddyBTreeBulkLoader.getFailure(); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 4d840b0..3eea0a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -537,7 +537,7 @@ public class LSMHarness implements ILSMHarness { operation.setNewComponent(newComponent); operation.getCallback().afterOperation(operation); if (newComponent != null) { - newComponent.markAsValid(lsmIndex.isDurable()); + newComponent.markAsValid(lsmIndex.isDurable(), operation); } } catch (Throwable e) { // NOSONAR Must catch all operation.setStatus(LSMIOOperationStatus.FAILURE); @@ -613,9 +613,18 @@ public class LSMHarness implements ILSMHarness { return operation; } + @SuppressWarnings("squid:S1181") @Override - public void addBulkLoadedComponent(ILSMDiskComponent c) throws HyracksDataException { - c.markAsValid(lsmIndex.isDurable()); + public void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException { + ILSMDiskComponent c = ioOperation.getNewComponent(); + try { + c.markAsValid(lsmIndex.isDurable(), ioOperation); + } catch (Throwable th) { + ioOperation.setFailure(th); + } + if (ioOperation.hasFailed()) { + throw HyracksDataException.create(ioOperation.getFailure()); + } synchronized (opTracker) { lsmIndex.addDiskComponent(c); if (replicationEnabled) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java index 84857f4..977697b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexBulkLoader.java @@ -23,6 +23,7 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.impls.AbstractTreeIndex.AbstractTreeIndexBulkLoader; import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleWriter; import org.apache.hyracks.storage.common.IIndexBulkLoader; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; public class LSMIndexBulkLoader implements IChainedComponentBulkLoader { private final IIndexBulkLoader bulkLoader; @@ -64,4 +65,19 @@ public class LSMIndexBulkLoader implements IChainedComponentBulkLoader { public void abort() throws HyracksDataException { bulkLoader.abort(); } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFailed() { + return bulkLoader.hasFailed(); + } + + @Override + public Throwable getFailure() { + return bulkLoader.getFailure(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java index 10074f9..3a43ba7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java @@ -25,6 +25,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.common.IIndexBulkLoader; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { private final AbstractLSMIndex lsmIndex; @@ -79,7 +80,7 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { } if (opCtx.getIoOperation().getStatus() == LSMIOOperationStatus.SUCCESS && opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) { - lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation().getNewComponent()); + lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation()); } } finally { lsmIndex.getIOOperationCallback().completed(opCtx.getIoOperation()); @@ -100,4 +101,19 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader { } } + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFailed() { + return opCtx.getIoOperation().hasFailed(); + } + + @Override + public Throwable getFailure() { + return opCtx.getIoOperation().getFailure(); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java index f57c4ef..5ee1503 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoOpIoOperation.java @@ -29,6 +29,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; public class NoOpIoOperation implements ILSMIOOperation { public static final NoOpIoOperation INSTANCE = new NoOpIoOperation(); @@ -126,4 +127,14 @@ public class NoOpIoOperation implements ILSMIOOperation { return null; } + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFailed() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java index f1172f3..3345e3a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/TracedIOOperation.java @@ -29,6 +29,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.util.trace.ITracer; import org.apache.hyracks.util.trace.ITracer.Scope; import org.apache.hyracks.util.trace.TraceUtils; @@ -162,4 +163,14 @@ class TracedIOOperation implements ILSMIOOperation { public Map<String, Object> getParameters() { return ioOp.getParameters(); } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + ioOp.writeFailed(page, failure); + } + + @Override + public boolean hasFailed() { + return ioOp.hasFailed(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java index 4b7f338..1ff9fa8 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java @@ -33,6 +33,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -190,12 +191,14 @@ public class ComponentUtils { } } - public static void markAsValid(ITreeIndex treeIndex, boolean forceToDisk) throws HyracksDataException { + public static void markAsValid(ITreeIndex treeIndex, boolean forceToDisk, IPageWriteFailureCallback callback) + throws HyracksDataException { int fileId = treeIndex.getFileId(); IBufferCache bufferCache = treeIndex.getBufferCache(); - treeIndex.getPageManager().close(); - // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page - // won't be flushed to disk because it won't be dirty until the write latch has been released. + treeIndex.getPageManager().close(callback); + if (callback.hasFailed()) { + throw HyracksDataException.create(callback.getFailure()); + } // Force modified metadata page to disk. // If the index is not durable, then the flush is not necessary. if (forceToDisk) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java index b030e83..41e72cd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java @@ -34,6 +34,7 @@ import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; import org.apache.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex; import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; public class LSMInvertedIndexDiskComponent extends AbstractLSMWithBuddyDiskComponent { @@ -102,15 +103,19 @@ public class LSMInvertedIndexDiskComponent extends AbstractLSMWithBuddyDiskCompo } @Override - public void markAsValid(boolean persist) throws HyracksDataException { + public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException { ComponentUtils.markAsValid(getBloomFilterBufferCache(), getBloomFilter(), persist); // Flush inverted index second. invIndex.getBufferCache().force((invIndex).getInvListsFileId(), true); - ComponentUtils.markAsValid(getMetadataHolder(), persist); - - // Flush deleted keys BTree. - ComponentUtils.markAsValid(getBuddyIndex(), persist); + ComponentUtils.markAsValid(getMetadataHolder(), persist, callback); + if (!callback.hasFailed()) { + // Flush deleted keys BTree. + ComponentUtils.markAsValid(getBuddyIndex(), persist, callback); + } + if (callback.hasFailed()) { + throw HyracksDataException.create(callback.getFailure()); + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java index c3c9c21..0b504a6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java @@ -61,6 +61,7 @@ import org.apache.hyracks.storage.common.MultiComparator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue; +import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback; import org.apache.hyracks.storage.common.file.BufferedFileHandle; /** @@ -231,7 +232,8 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { listCursor.open(initState, null); } - public abstract class AbstractOnDiskInvertedIndexBulkLoader implements IIndexBulkLoader { + public abstract class AbstractOnDiskInvertedIndexBulkLoader extends PageWriteFailureCallback + implements IIndexBulkLoader { protected final ArrayTupleBuilder btreeTupleBuilder; protected final ArrayTupleReference btreeTupleReference; protected final IIndexBulkLoader btreeBulkloader; @@ -272,7 +274,7 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { } protected void pinNextPage() throws HyracksDataException { - queue.put(currentPage); + queue.put(currentPage, this); currentPageId++; currentPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, currentPageId)); } @@ -352,10 +354,13 @@ public class OnDiskInvertedIndex implements IInPlaceInvertedIndex { btreeBulkloader.end(); if (currentPage != null) { - queue.put(currentPage); + queue.put(currentPage, this); } invListsMaxPageId = currentPageId; bufferCache.finishQueue(); + if (hasFailed()) { + throw HyracksDataException.create(getFailure()); + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java index f902153..5e8bd0b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTree.java @@ -59,6 +59,7 @@ import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.util.trace.ITracer; /** @@ -499,12 +500,15 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { if (isTransaction) { // Since this is a transaction component, validate and // deactivate. it could later be added or deleted - component.markAsValid(durable); - ioOpCallback.afterFinalize(loadOp); + try { + component.markAsValid(durable, loadOp); + } finally { + ioOpCallback.afterFinalize(loadOp); + } component.deactivate(); } else { ioOpCallback.afterFinalize(loadOp); - getHarness().addBulkLoadedComponent(component); + getHarness().addBulkLoadedComponent(loadOp); } } } finally { @@ -529,6 +533,21 @@ public class ExternalRTree extends LSMRTree implements ITwoPCIndex { ioOpCallback.completed(loadOp); } } + + @Override + public void writeFailed(ICachedPage page, Throwable failure) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasFailed() { + return loadOp.hasFailed(); + } + + @Override + public Throwable getFailure() { + return loadOp.getFailure(); + } } // The only change the the schedule merge is the method used to create the http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java index 0e455c5..f12f423 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java @@ -941,11 +941,10 @@ public class RTree extends AbstractTreeIndex { propagateBulk(1, false, pagesToWrite); leafFrontier.pageId = freePageManager.takePage(metaFrame); - queue.put(leafFrontier.page); + queue.put(leafFrontier.page, this); for (ICachedPage c : pagesToWrite) { - queue.put(c); + queue.put(c, this); } - pagesToWrite.clear(); leafFrontier.page = bufferCache .confiscatePage(BufferedFileHandle.getDiskPageId(getFileId(), leafFrontier.pageId)); @@ -975,7 +974,7 @@ public class RTree extends AbstractTreeIndex { } for (ICachedPage c : pagesToWrite) { - queue.put(c); + queue.put(c, this); } finish(); super.end(); @@ -1011,7 +1010,7 @@ public class RTree extends AbstractTreeIndex { ((RTreeNSMFrame) lowerFrame).adjustMBR(); interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0); } - queue.put(n.page); + queue.put(n.page, this); n.page = null; prevPageId = n.pageId; } @@ -1021,7 +1020,6 @@ public class RTree extends AbstractTreeIndex { protected void propagateBulk(int level, boolean toRoot, List<ICachedPage> pagesToWrite) throws HyracksDataException { - boolean propagated = false; if (level == 1) { lowerFrame = leafFrame; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml index 67d32dd..a76fe48 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml @@ -54,6 +54,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c72a503/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java index d6c954e..138705f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IIndexBulkLoader.java @@ -20,8 +20,9 @@ package org.apache.hyracks.storage.common; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; -public interface IIndexBulkLoader { +public interface IIndexBulkLoader extends IPageWriteFailureCallback { /** * Append a tuple to the index in the context of a bulk load. * @@ -36,6 +37,7 @@ public interface IIndexBulkLoader { /** * Finalize the bulk loading operation in the given context and release all resources. + * After this method is called, caller can't add more tuples nor abort * * @throws HyracksDataException * If the BufferCache throws while un/pinning or un/latching.