abdullah alamoudi has submitted this change and it was merged. Change subject: [NO ISSUE][STO] Misc fixes in storage ......................................................................
[NO ISSUE][STO] Misc fixes in storage - user model changes: no - storage format changes: no - interface changes: no Details: - Blocking IO callback used to wait for any notification on the callback before returning. The behaviour was fixed to only return if the completion flag was set on afterFinalize. - Reading and writing to and from memory component's didn't do any locking and so, this could read to concurrency issues. - Reading metadata values used to rely on pointables which can be problematic because then the caller will need to latch/pin the page correctly. To avoid this, readers of metadata pages will always take a copy of the metadata. Change-Id: I4bdc4c16a9c126d311378e56651632bbb4a50864 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2548 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> --- M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java M hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/GrowableArray.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java 16 files changed, 187 insertions(+), 102 deletions(-) Approvals: Anon. E. Moose #1000171: abdullah alamoudi: Looks good to me, but someone else must approve Jenkins: Verified; No violations found; ; Verified Murtadha Hubail: Looks good to me, approved diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java index f9421a1..1251d91 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/FlushMetadataOnlyTest.java @@ -28,7 +28,7 @@ import org.apache.asterix.test.common.TestHelper; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.data.std.util.DataUtils; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; @@ -106,7 +106,7 @@ StorageTestUtils.flush(dsLifecycleMgr, lsmBtree, false); // assert one disk component Assert.assertEquals(1, lsmBtree.getDiskComponents().size()); - VoidPointable pointable = VoidPointable.FACTORY.createPointable(); + ArrayBackedValueStorage pointable = new ArrayBackedValueStorage(); ComponentUtils.get(lsmBtree, key, pointable); Assert.assertTrue(DataUtils.equals(pointable, value)); // ensure that we can search this component diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java index 0a968c8..2121327 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java @@ -56,6 +56,7 @@ import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation; import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; @@ -156,9 +157,10 @@ iHelperFactory.create(ctx.getJobletContext().getServiceContext(), 0); dataflowHelper.open(); LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance(); - LongPointable longPointable = LongPointable.FACTORY.createPointable(); - ComponentUtils.get(btree, ComponentUtils.MARKER_LSN_KEY, longPointable); - long lsn = longPointable.getLong(); + ArrayBackedValueStorage buffer = new ArrayBackedValueStorage(); + + ComponentUtils.get(btree, ComponentUtils.MARKER_LSN_KEY, buffer); + long lsn = LongPointable.getLong(buffer.getByteArray(), buffer.getStartOffset()); int numOfMarkers = 0; LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false); long expectedMarkerId = markerId - 1; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java index b9f0cc7..f027979 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java @@ -28,6 +28,7 @@ import org.apache.asterix.common.storage.ResourceReference; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -68,6 +69,7 @@ protected ILSMComponentId[] nextComponentIds; protected final ILSMComponentIdGenerator idGenerator; + protected final ArrayBackedValueStorage buffer = new ArrayBackedValueStorage(Long.BYTES); private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; private final Map<ILSMComponentId, Long> componentLsnMap = new HashMap<>(); @@ -128,7 +130,7 @@ } LongPointable markerLsn = LongPointable.FACTORY .createPointable(ComponentUtils.getLong(opCtx.getComponentsToBeMerged().get(0).getMetadata(), - ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND)); + ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND, buffer)); opCtx.getNewComponent().getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn); } else if (opCtx.getIoOperationType() == LSMIOOperationType.FLUSH) { // advance memory component indexes diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java index abe474b..a2ab15f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; @@ -34,6 +35,7 @@ public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback { private final LongPointable pointable = LongPointable.FACTORY.createPointable(); private final ILSMIndex index; + private final ArrayBackedValueStorage buffer = new ArrayBackedValueStorage(Long.BYTES); /** * @param index: @@ -53,7 +55,7 @@ long lsn; try { lsn = ComponentUtils.getLong(index.getCurrentMemoryComponent().getMetadata(), ComponentUtils.MARKER_LSN_KEY, - ComponentUtils.NOT_FOUND); + ComponentUtils.NOT_FOUND, buffer); } catch (HyracksDataException e) { // Should never happen since this is a memory component throw new IllegalStateException(e); @@ -76,7 +78,7 @@ for (ILSMDiskComponent c : diskComponents) { try { long lsn = ComponentUtils.getLong(c.getMetadata(), ComponentUtils.MARKER_LSN_KEY, - ComponentUtils.NOT_FOUND); + ComponentUtils.NOT_FOUND, buffer); if (lsn != ComponentUtils.NOT_FOUND) { return lsn; } @@ -101,7 +103,7 @@ if (c.isReadable()) { try { lsn = ComponentUtils.getLong(c.getMetadata(), ComponentUtils.MARKER_LSN_KEY, - ComponentUtils.NOT_FOUND); + ComponentUtils.NOT_FOUND, buffer); } catch (HyracksDataException e) { // Should never happen since this is a memory component throw new IllegalStateException(e); @@ -117,7 +119,11 @@ @Override public void after(long lsn) { pointable.setLong(lsn); - index.getCurrentMemoryComponent().getMetadata().put(ComponentUtils.MARKER_LSN_KEY, pointable); + try { + index.getCurrentMemoryComponent().getMetadata().put(ComponentUtils.MARKER_LSN_KEY, pointable); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } } public ILSMIndex getIndex() { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 8635efd..1846062 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -115,7 +115,7 @@ 96 = Illegal attempt to enter empty component 97 = Illegal attempt to exit empty component 98 = A flush operation has failed -99 = A merge operation has failed +99 = A merge operation has failed. The component %1$s was found in the list of index components 100 = Failed to shutdown event processor for %1$s 101 = Page %1$s does not exist in file %2$s 102 = Failed to open virtual buffer cache since it is already open diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java index e075f4e..d5a4481 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java @@ -22,12 +22,22 @@ import java.io.IOException; import java.util.Objects; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IMutableValueStorage; +import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.api.IValueReference; -public class ArrayBackedValueStorage implements IMutableValueStorage { +public class ArrayBackedValueStorage implements IMutableValueStorage, IPointable { - private final GrowableArray data = new GrowableArray(); + private final GrowableArray data; + + public ArrayBackedValueStorage(int size) { + data = new GrowableArray(size); + } + + public ArrayBackedValueStorage() { + data = new GrowableArray(); + } @Override public void reset() { @@ -54,16 +64,15 @@ return data.getLength(); } - //TODO: don't swallow, but throw the exception - public void append(IValueReference value) { + public void append(IValueReference value) throws HyracksDataException { try { data.append(value); } catch (IOException e) { - e.printStackTrace(); + throw HyracksDataException.create(e); } } - public void assign(IValueReference value) { + public void assign(IValueReference value) throws HyracksDataException { reset(); append(value); } @@ -89,4 +98,31 @@ return Objects.equals(data, other.data); } + @Override + public void set(byte[] bytes, int start, int length) { + reset(); + if (bytes != null) { + try { + data.append(bytes, start, length); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public void set(IValueReference pointer) { + try { + assign(pointer); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } + } + + public byte[] toByteArray() { + byte[] byteArray = new byte[getLength()]; + System.arraycopy(getByteArray(), getStartOffset(), byteArray, 0, getLength()); + return byteArray; + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/GrowableArray.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/GrowableArray.java index 994d286..12486d9 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/GrowableArray.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/GrowableArray.java @@ -26,8 +26,18 @@ import org.apache.hyracks.data.std.api.IValueReference; public class GrowableArray implements IDataOutputProvider { - private final ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream(); - private final RewindableDataOutputStream dos = new RewindableDataOutputStream(baaos); + private final ByteArrayAccessibleOutputStream baaos; + private final RewindableDataOutputStream dos; + + public GrowableArray() { + baaos = new ByteArrayAccessibleOutputStream(); + dos = new RewindableDataOutputStream(baaos); + } + + public GrowableArray(int size) { + baaos = new ByteArrayAccessibleOutputStream(size); + dos = new RewindableDataOutputStream(baaos); + } @Override public DataOutput getDataOutput() { @@ -65,7 +75,11 @@ } public void append(IValueReference value) throws IOException { - dos.write(value.getByteArray(), value.getStartOffset(), value.getLength()); + append(value.getByteArray(), value.getStartOffset(), value.getLength()); + } + + public void append(byte[] data, int offset, int length) throws IOException { + dos.write(data, offset, length); } public void setSize(int bytesRequired) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java index 5dee557..fa69d7a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java @@ -19,8 +19,8 @@ package org.apache.hyracks.storage.am.lsm.common.api; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; public interface IComponentMetadata { @@ -41,14 +41,5 @@ * @param value * @throws HyracksDataException */ - void get(IValueReference key, IPointable value) throws HyracksDataException; - - /** - * Get the value - * - * @param key - * @return - * @throws HyracksDataException - */ - IValueReference get(IValueReference key) throws HyracksDataException; + void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java index 26c7b0d..c4616d3 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -19,6 +19,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; @@ -36,6 +37,7 @@ private static final Logger LOGGER = LogManager.getLogger(); private final DiskComponentMetadata metadata; + private final ArrayBackedValueStorage buffer = new ArrayBackedValueStorage(Long.BYTES); // a variable cache of componentId stored in metadata. // since componentId is immutable, we do not want to read from metadata every time the componentId @@ -121,7 +123,7 @@ } synchronized (this) { if (componentId == null) { - componentId = LSMComponentIdUtils.readFrom(metadata); + componentId = LSMComponentIdUtils.readFrom(metadata, buffer); } } if (componentId.missing()) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java index 042720c..a8ee286 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallbackWrapper.java @@ -34,7 +34,7 @@ } public synchronized void waitForIO() throws InterruptedException { - if (!notified) { + while (!notified) { wait(); } notified = false; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java index d1244ce..649989c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java @@ -19,9 +19,8 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.api.IValueReference; -import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata; @@ -39,15 +38,8 @@ } @Override - public void get(IValueReference key, IPointable value) throws HyracksDataException { + public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { mdpManager.get(mdpManager.createMetadataFrame(), key, value); - } - - @Override - public IValueReference get(IValueReference key) throws HyracksDataException { - VoidPointable value = VoidPointable.FACTORY.createPointable(); - get(key, value); - return value; } public void put(MemoryComponentMetadata metadata) throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java index 7d1925b..d0fe8a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java @@ -19,8 +19,8 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; public class EmptyDiskComponentMetadata extends DiskComponentMetadata { public static final EmptyDiskComponentMetadata INSTANCE = new EmptyDiskComponentMetadata(); @@ -35,12 +35,7 @@ } @Override - public void get(IValueReference key, IPointable value) throws HyracksDataException { - throw new IllegalStateException("Attempt to read metadata of empty component"); - } - - @Override - public IValueReference get(IValueReference key) throws HyracksDataException { + public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { throw new IllegalStateException("Attempt to read metadata of empty component"); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index eed8f6e..59f48d4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -601,6 +601,7 @@ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) { + LOGGER.info("Failed to enter components for merge operation. Calling finalize"); ctx.setIoOperationType(LSMIOOperationType.MERGE); callback.afterFinalize(ctx); return; @@ -871,10 +872,12 @@ scheduleMerge(ctx, ioCallback); } IOOperationUtils.waitForIoOperation(ioCallback); - // ensure that merge has succeeded - for (ILSMDiskComponent component : toBeDeleted) { - if (lsmIndex.getDiskComponents().contains(component)) { - throw HyracksDataException.create(ErrorCode.A_MERGE_OPERATION_HAS_FAILED); + synchronized (opTracker) { + // ensure that merge has succeeded + for (ILSMDiskComponent component : toBeDeleted) { + if (lsmIndex.getDiskComponents().contains(component)) { + throw HyracksDataException.create(ErrorCode.A_MERGE_OPERATION_HAS_FAILED, component.toString()); + } } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java index 3179790..e73fa0a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java @@ -20,10 +20,10 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; @@ -35,61 +35,94 @@ public class MemoryComponentMetadata implements IComponentMetadata { private static final Logger LOGGER = LogManager.getLogger(); - private static final byte[] empty = new byte[0]; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final List<org.apache.commons.lang3.tuple.Pair<IValueReference, ArrayBackedValueStorage>> store = new ArrayList<>(); /** * Note: for memory metadata, it is expected that the key will be constant + * + * @throws HyracksDataException */ @Override - public void put(IValueReference key, IValueReference value) { - ArrayBackedValueStorage stored = get(key); - if (stored == null) { - stored = new ArrayBackedValueStorage(); - store.add(Pair.of(key, stored)); + public void put(IValueReference key, IValueReference value) throws HyracksDataException { + lock.writeLock().lock(); + try { + ArrayBackedValueStorage stored = get(key); + if (stored == null) { + stored = new ArrayBackedValueStorage(); + store.add(Pair.of(key, stored)); + } + stored.assign(value); + } finally { + lock.writeLock().unlock(); } - stored.assign(value); } /** * Note: for memory metadata, it is expected that the key will be constant + * + * @throws HyracksDataException */ @Override - public void get(IValueReference key, IPointable value) { - value.set(empty, 0, 0); - ArrayBackedValueStorage stored = get(key); - if (stored != null) { - value.set(stored); + public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { + lock.readLock().lock(); + try { + value.reset(); + ArrayBackedValueStorage stored = get(key); + if (stored != null) { + value.append(stored); + } + } finally { + lock.readLock().unlock(); } } - @Override - public ArrayBackedValueStorage get(IValueReference key) { - for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) { - if (pair.getKey().equals(key)) { - return pair.getValue(); + private ArrayBackedValueStorage get(IValueReference key) { + lock.readLock().lock(); + try { + for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) { + if (pair.getKey().equals(key)) { + return pair.getValue(); + } } + return null; + } finally { + lock.readLock().unlock(); } - return null; } public void copy(IMetadataPageManager mdpManager) throws HyracksDataException { - LOGGER.log(Level.INFO, "Copying Metadata into a different component"); - ITreeIndexMetadataFrame frame = mdpManager.createMetadataFrame(); - for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) { - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, "Copying " + pair.getKey() + " : " + pair.getValue().getLength() + " bytes"); + lock.readLock().lock(); + try { + LOGGER.log(Level.INFO, "Copying Metadata into a different component"); + ITreeIndexMetadataFrame frame = mdpManager.createMetadataFrame(); + for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) { + if (LOGGER.isInfoEnabled()) { + LOGGER.log(Level.INFO, "Copying " + pair.getKey() + " : " + pair.getValue().getLength() + " bytes"); + } + mdpManager.put(frame, pair.getKey(), pair.getValue()); } - mdpManager.put(frame, pair.getKey(), pair.getValue()); + } finally { + lock.readLock().unlock(); } } public void copy(DiskComponentMetadata metadata) throws HyracksDataException { - metadata.put(this); + lock.readLock().lock(); + try { + metadata.put(this); + } finally { + lock.readLock().unlock(); + } } public void reset() { - store.clear(); + lock.writeLock().lock(); + try { + store.clear(); + } finally { + lock.writeLock().unlock(); + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java index 94a3702..4b7f338 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java @@ -24,6 +24,7 @@ import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.storage.am.bloomfilter.impls.BloomFilter; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; @@ -59,10 +60,10 @@ * @throws HyracksDataException * If the comopnent was a disk component and an IO error was encountered */ - public static long getLong(IComponentMetadata metadata, IValueReference key, long defaultValue) - throws HyracksDataException { - IValueReference value = metadata.get(key); - return value == null || value.getLength() == 0 ? defaultValue + public static long getLong(IComponentMetadata metadata, IValueReference key, long defaultValue, + ArrayBackedValueStorage value) throws HyracksDataException { + metadata.get(key, value); + return value.getLength() == 0 ? defaultValue : LongPointable.getLong(value.getByteArray(), value.getStartOffset()); } @@ -73,31 +74,36 @@ * * @param index * @param key - * @param pointable + * @param value * @throws HyracksDataException */ - public static void get(ILSMIndex index, IValueReference key, IPointable pointable) throws HyracksDataException { + public static void get(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value) + throws HyracksDataException { boolean loggable = LOGGER.isDebugEnabled(); + value.reset(); if (loggable) { LOGGER.log(Level.DEBUG, "Getting " + key + " from index " + index); } // Lock the opTracker to ensure index components don't change synchronized (index.getOperationTracker()) { - index.getCurrentMemoryComponent().getMetadata().get(key, pointable); - if (pointable.getLength() == 0) { + ILSMMemoryComponent cmc = index.getCurrentMemoryComponent(); + if (cmc.isReadable()) { + index.getCurrentMemoryComponent().getMetadata().get(key, value); + } + if (value.getLength() == 0) { if (loggable) { LOGGER.log(Level.DEBUG, key + " was not found in mutable memory component of " + index); } // was not found in the in current mutable component, search in the other in memory components - fromImmutableMemoryComponents(index, key, pointable); - if (pointable.getLength() == 0) { + fromImmutableMemoryComponents(index, key, value); + if (value.getLength() == 0) { if (loggable) { LOGGER.log(Level.DEBUG, key + " was not found in all immmutable memory components of " + index); } // was not found in the in all in memory components, search in the disk components - fromDiskComponents(index, key, pointable); + fromDiskComponents(index, key, value); if (loggable) { - if (pointable.getLength() == 0) { + if (value.getLength() == 0) { LOGGER.log(Level.DEBUG, key + " was not found in all disk components of " + index); } else { LOGGER.log(Level.DEBUG, key + " was found in disk components of " + index); @@ -134,7 +140,7 @@ } } - private static void fromDiskComponents(ILSMIndex index, IValueReference key, IPointable pointable) + private static void fromDiskComponents(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { boolean loggable = LOGGER.isDebugEnabled(); if (loggable) { @@ -144,15 +150,16 @@ if (loggable) { LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components " + c); } - c.getMetadata().get(key, pointable); - if (pointable.getLength() != 0) { + c.getMetadata().get(key, value); + if (value.getLength() != 0) { // Found return; } } } - private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key, IPointable pointable) { + private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key, + ArrayBackedValueStorage value) throws HyracksDataException { boolean loggable = LOGGER.isDebugEnabled(); if (loggable) { LOGGER.log(Level.DEBUG, "Getting " + key + " from immutable memory components of " + index); @@ -174,8 +181,8 @@ } ILSMMemoryComponent c = index.getMemoryComponents().get(next); if (c.isReadable()) { - c.getMetadata().get(key, pointable); - if (pointable.getLength() != 0) { + c.getMetadata().get(key, value); + if (value.getLength() != 0) { // Found return; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java index 3c88543..6d4b0a7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java @@ -20,6 +20,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; @@ -37,9 +38,10 @@ } - public static ILSMComponentId readFrom(IComponentMetadata metadata) throws HyracksDataException { - long minId = ComponentUtils.getLong(metadata, COMPONENT_ID_MIN_KEY, LSMComponentId.NOT_FOUND); - long maxId = ComponentUtils.getLong(metadata, COMPONENT_ID_MAX_KEY, LSMComponentId.NOT_FOUND); + public static ILSMComponentId readFrom(IComponentMetadata metadata, ArrayBackedValueStorage buffer) + throws HyracksDataException { + long minId = ComponentUtils.getLong(metadata, COMPONENT_ID_MIN_KEY, LSMComponentId.NOT_FOUND, buffer); + long maxId = ComponentUtils.getLong(metadata, COMPONENT_ID_MAX_KEY, LSMComponentId.NOT_FOUND, buffer); if (minId == LSMComponentId.NOT_FOUND || maxId == LSMComponentId.NOT_FOUND) { return LSMComponentId.MISSING_COMPONENT_ID; } else { -- To view, visit https://asterix-gerrit.ics.uci.edu/2548 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I4bdc4c16a9c126d311378e56651632bbb4a50864 Gerrit-PatchSet: 8 Gerrit-Project: asterixdb Gerrit-Branch: release-0.9.4-pre-rc Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>