Repository: asterixdb Updated Branches: refs/heads/master 31d8102aa -> 8c427cd4b
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java index 8f005d8..77020f8 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java @@ -22,8 +22,8 @@ import java.io.PrintStream; import java.nio.ByteBuffer; import org.apache.hyracks.api.comm.FrameHelper; +import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.HyracksConstants; @@ -36,24 +36,24 @@ import org.apache.hyracks.util.IntSerDeUtils; * This appender must only be used on network boundary */ public class MessagingFrameTupleAppender extends FrameTupleAppender { - - private final IHyracksTaskContext ctx; - private static final int NULL_MESSAGE_SIZE = 1; + public static final int NULL_MESSAGE_SIZE = 1; public static final byte NULL_FEED_MESSAGE = 0x01; public static final byte ACK_REQ_FEED_MESSAGE = 0x02; public static final byte MARKER_MESSAGE = 0x03; + + private final IHyracksTaskContext ctx; private boolean initialized = false; - private VSizeFrame message; + private IFrame message; public MessagingFrameTupleAppender(IHyracksTaskContext ctx) { this.ctx = ctx; } - public static void printMessage(VSizeFrame message, PrintStream out) throws HyracksDataException { + public static void printMessage(IFrame message, PrintStream out) throws HyracksDataException { out.println(getMessageString(message)); } - public static String getMessageString(VSizeFrame message) throws HyracksDataException { + public static String getMessageString(IFrame message) throws HyracksDataException { StringBuilder aString = new StringBuilder(); aString.append("Message Type: "); switch (getMessageType(message)) { @@ -76,7 +76,7 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender { return aString.toString(); } - public static byte getMessageType(VSizeFrame message) throws HyracksDataException { + public static byte getMessageType(IFrame message) throws HyracksDataException { switch (message.getBuffer().array()[0]) { case NULL_FEED_MESSAGE: return NULL_FEED_MESSAGE; @@ -105,15 +105,13 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender { @Override public int getTupleCount() { - // if message is set, there is always a message. that message could be a null message (TODO: optimize) - return tupleCount + ((message == null) ? 0 : 1); + return tupleCount + 1; } @Override public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException { if (!initialized) { - message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx); - initialized = true; + init(); } // If message fits, we append it, otherwise, we append a null message, then send a message only // frame with the message @@ -125,7 +123,7 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender { } else { ByteBuffer buffer = message.getBuffer(); int messageSize = buffer.limit() - buffer.position(); - if (hasEnoughSpace(1, messageSize)) { + if (hasEnoughSpace(0, messageSize)) { appendMessage(buffer); forward(outWriter); } else { @@ -133,7 +131,7 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender { appendNullMessage(); forward(outWriter); } - if (!hasEnoughSpace(1, messageSize)) { + if (!hasEnoughSpace(0, messageSize)) { frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize())); reset(frame.getBuffer(), true); } @@ -143,6 +141,11 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender { } } + private void init() { + message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx); + initialized = true; + } + private void forward(IFrameWriter outWriter) throws HyracksDataException { getBuffer().clear(); outWriter.nextFrame(getBuffer()); @@ -168,4 +171,13 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender { ++tupleCount; IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); } + + /* + * Always write and then flush to send out the message if exists + */ + @Override + public void flush(IFrameWriter writer) throws HyracksDataException { + write(writer, true); + writer.flush(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java index 6d87d89..dbd3afa 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java @@ -41,25 +41,30 @@ public class PartitionDataWriter implements IFrameWriter { private final ITuplePartitionComputer tpc; private final IHyracksTaskContext ctx; private boolean[] allocatedFrames; + private boolean failed = false; public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException { + this.ctx = ctx; + this.tpc = tpc; this.consumerPartitionCount = consumerPartitionCount; pWriters = new IFrameWriter[consumerPartitionCount]; isOpen = new boolean[consumerPartitionCount]; allocatedFrames = new boolean[consumerPartitionCount]; appenders = new FrameTupleAppender[consumerPartitionCount]; + tupleAccessor = new FrameTupleAccessor(recordDescriptor); + initializeAppenders(pwFactory); + } + + protected void initializeAppenders(IPartitionWriterFactory pwFactory) throws HyracksDataException { for (int i = 0; i < consumerPartitionCount; ++i) { try { pWriters[i] = pwFactory.createFrameWriter(i); appenders[i] = createTupleAppender(ctx); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } - tupleAccessor = new FrameTupleAccessor(recordDescriptor); - this.tpc = tpc; - this.ctx = ctx; } protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) { @@ -71,25 +76,17 @@ public class PartitionDataWriter implements IFrameWriter { HyracksDataException closeException = null; for (int i = 0; i < pWriters.length; ++i) { if (isOpen[i]) { - if (allocatedFrames[i] && appenders[i].getTupleCount() > 0) { + if (allocatedFrames[i] && appenders[i].getTupleCount() > 0 && !failed) { try { appenders[i].write(pWriters[i], true); } catch (Throwable th) { - if (closeException == null) { - closeException = new HyracksDataException(th); - } else { - closeException.addSuppressed(th); - } + closeException = HyracksDataException.suppress(closeException, th); } } try { pWriters[i].close(); } catch (Throwable th) { - if (closeException == null) { - closeException = new HyracksDataException(th); - } else { - closeException.addSuppressed(th); - } + closeException = HyracksDataException.suppress(closeException, th); } } } @@ -126,17 +123,14 @@ public class PartitionDataWriter implements IFrameWriter { @Override public void fail() throws HyracksDataException { + failed = true; HyracksDataException failException = null; for (int i = 0; i < appenders.length; ++i) { if (isOpen[i]) { try { pWriters[i].fail(); } catch (Throwable th) { - if (failException == null) { - failException = new HyracksDataException(th); - } else { - failException.addSuppressed(th); - } + failException = HyracksDataException.suppress(failException, th); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java index a985b4d..b89922e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java @@ -27,7 +27,7 @@ import org.apache.hyracks.storage.common.file.LocalResource; @FunctionalInterface public interface IModificationOperationCallbackFactory extends Serializable { - public IModificationOperationCallback createModificationOperationCallback(LocalResource resource, + IModificationOperationCallback createModificationOperationCallback(LocalResource resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java index 627994c..a19e69a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java @@ -23,10 +23,6 @@ import org.apache.hyracks.data.std.api.IValueReference; public class MutableArrayValueReference implements IValueReference { private byte[] array; - public MutableArrayValueReference() { - //mutable array. user doesn't need to specify the array in advance - } - public MutableArrayValueReference(byte[] array) { this.array = array; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java new file mode 100644 index 0000000..de72690 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.api; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * An interface that is used to enable frame level operation on indexes + */ +@FunctionalInterface +public interface IFrameOperationCallback { + /** + * Called once processing the frame is done before calling nextFrame on the next IFrameWriter in + * the pipeline + * + * @param modified + * true if the index was modified during the processing of the frame, false otherwise + * @throws HyracksDataException + */ + void frameCompleted(boolean modified) throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java new file mode 100644 index 0000000..8031d32 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.api; + +import java.io.Serializable; + +import org.apache.hyracks.api.context.IHyracksTaskContext; + +/** + * A factory for {@link IFrameOperationCallback} + */ +@FunctionalInterface +public interface IFrameOperationCallbackFactory extends Serializable { + /** + * Create a {@link IFrameOperationCallback} for an index operator + * + * @param ctx + * the task context + * @param indexAccessor + * the accessor for the index + * @return an instance of {@link IFrameOperationCallback} + */ + IFrameOperationCallback createFrameOperationCallback(IHyracksTaskContext ctx, ILSMIndexAccessor indexAccessor); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 7a2bc7c..f21c8a3 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.common.api; import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexCursor; import org.apache.hyracks.storage.am.common.api.ISearchPredicate; @@ -28,34 +29,174 @@ import org.apache.hyracks.storage.am.common.api.IndexException; public interface ILSMHarness { + /** + * Force modification even if memory component is full + * + * @param ctx + * the operation context + * @param tuple + * the operation tuple + * @throws HyracksDataException + * @throws IndexException + */ void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException, IndexException; + /** + * Modify the index if the memory component is not full, wait for a new memory component if the current one is full + * + * @param ctx + * the operation context + * @param tryOperation + * true if IO operation + * @param tuple + * the operation tuple + * @return + * @throws HyracksDataException + * @throws IndexException + */ boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple) throws HyracksDataException, IndexException; + /** + * Search the index + * + * @param ctx + * the search operation context + * @param cursor + * the index cursor + * @param pred + * the search predicate + * @throws HyracksDataException + * @throws IndexException + */ void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException, IndexException; + /** + * End the search + * + * @param ctx + * @throws HyracksDataException + */ void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException; + /** + * Schedule a merge + * + * @param ctx + * @param callback + * @throws HyracksDataException + * @throws IndexException + */ void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException, IndexException; + /** + * Schedule full merge + * + * @param ctx + * @param callback + * @throws HyracksDataException + * @throws IndexException + */ void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException, IndexException; + /** + * Perform a merge operation + * + * @param ctx + * @param operation + * @throws HyracksDataException + * @throws IndexException + */ void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException, IndexException; + /** + * Schedule a flush + * + * @param ctx + * @param callback + * @throws HyracksDataException + */ void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; + /** + * Perform a flush + * + * @param ctx + * @param operation + * @throws HyracksDataException + * @throws IndexException + */ void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException, IndexException; + /** + * Add bulk loaded component + * + * @param index + * the new component + * @throws HyracksDataException + * @throws IndexException + */ void addBulkLoadedComponent(ILSMDiskComponent index) throws HyracksDataException, IndexException; + /** + * Get index operation tracker + */ ILSMOperationTracker getOperationTracker(); + /** + * Schedule replication + * + * @param ctx + * the operation context + * @param diskComponents + * the disk component to be replicated + * @param bulkload + * true if the components were bulk loaded, false otherwise + * @param opType + * The operation type + * @throws HyracksDataException + */ void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents, boolean bulkload, LSMOperationType opType) throws HyracksDataException; + /** + * End a replication operation + * + * @param ctx + * the operation context + * @throws HyracksDataException + */ void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException; + + /** + * Update the metadata of the memory component of the index. Waiting for a new memory component if + * the current memory component is full + * + * @param ctx + * the operation context + * @param key + * the meta key + * @param value + * the meta value + * @throws HyracksDataException + */ + void updateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value) + throws HyracksDataException; + + /** + * Force updating the metadata of the memory component of the index even if memory component is full + * + * @param ctx + * the operation context + * @param key + * the meta key + * @param value + * the meta value + * @throws HyracksDataException + */ + void forceUpdateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value) + throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index fecc674..90c70aa 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.common.api; import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexAccessor; import org.apache.hyracks.storage.am.common.api.IndexException; @@ -34,16 +35,43 @@ import org.apache.hyracks.storage.am.common.api.TreeIndexException; * concurrent operations). */ public interface ILSMIndexAccessor extends IIndexAccessor { + /** + * Schedule a flush operation + * + * @param callback + * the IO operation callback + * @throws HyracksDataException + */ void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException; + /** + * Schedule a merge operation + * + * @param callback + * the merge operation callback + * @param components + * the components to be merged + * @throws HyracksDataException + * @throws IndexException + */ void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components) throws HyracksDataException, IndexException; + /** + * Schedule a full merge + * + * @param callback + * the merge operation callback + * @throws HyracksDataException + * @throws IndexException + */ void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException; /** - * Deletes the tuple from the memory component only. + * Delete the tuple from the memory component only. Don't replace with antimatter tuple * + * @param tuple + * the tuple to be deleted * @throws HyracksDataException * @throws IndexException */ @@ -113,12 +141,49 @@ public interface ILSMIndexAccessor extends IIndexAccessor { */ boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException; + /** + * Delete the tuple from the memory component only. Don't replace with antimatter tuple + * Perform operation even if the memory component is full + * + * @param tuple + * the tuple to delete + * @throws HyracksDataException + * @throws IndexException + */ void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException; + /** + * Insert a new tuple (failing if duplicate key entry is found) + * + * @param tuple + * the tuple to insert + * @throws HyracksDataException + * @throws IndexException + */ void forceInsert(ITupleReference tuple) throws HyracksDataException, IndexException; + /** + * Force deleting an index entry even if the memory component is full + * replace the entry if found with an antimatter tuple, otherwise, simply insert the antimatter tuple + * + * @param tuple + * tuple to delete + * @throws HyracksDataException + * @throws IndexException + */ void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException; + /** + * Schedule a replication for disk components + * + * @param diskComponents + * the components to be replicated + * @param bulkload + * true if the components were bulkloaded, false otherwise + * @param opType + * the operation type + * @throws HyracksDataException + */ void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean bulkload, LSMOperationType opType) throws HyracksDataException; @@ -137,4 +202,24 @@ public interface ILSMIndexAccessor extends IIndexAccessor { * @throws TreeIndexException */ void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException; + + /** + * Update the metadata of the memory component, wait for the new component if the current one is UNWRITABLE + * + * @param key + * the key + * @param value + * the value + * @throws HyracksDataException + */ + void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException; + + /** + * Force update the metadata of the current memory component even if it is UNWRITABLE + * + * @param key + * @param value + * @throws HyracksDataException + */ + void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 6bf9312..01e85d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -28,16 +28,17 @@ import java.util.logging.Logger; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; +import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexCursor; import org.apache.hyracks.storage.am.common.api.ISearchPredicate; import org.apache.hyracks.storage.am.common.api.IndexException; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; @@ -360,6 +361,44 @@ public class LSMHarness implements ILSMHarness { return modify(ctx, tryOperation, tuple, opType); } + @Override + public void updateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value) + throws HyracksDataException { + if (!lsmIndex.isMemoryComponentsAllocated()) { + lsmIndex.allocateMemoryComponents(); + } + getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false); + try { + lsmIndex.getCurrentMemoryComponent().getMetadata().put(key, value); + } finally { + exitAndComplete(ctx, LSMOperationType.MODIFICATION); + } + } + + private void exitAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException { + try { + exitComponents(ctx, op, null, false); + } catch (IndexException e) { + throw new HyracksDataException(e); + } finally { + opTracker.completeOperation(null, op, null, ctx.getModificationCallback()); + } + } + + @Override + public void forceUpdateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value) + throws HyracksDataException { + if (!lsmIndex.isMemoryComponentsAllocated()) { + lsmIndex.allocateMemoryComponents(); + } + getAndEnterComponents(ctx, LSMOperationType.FORCE_MODIFICATION, false); + try { + lsmIndex.getCurrentMemoryComponent().getMetadata().put(key, value); + } finally { + exitAndComplete(ctx, LSMOperationType.FORCE_MODIFICATION); + } + } + private boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple, LSMOperationType opType) throws HyracksDataException, IndexException { if (!lsmIndex.isMemoryComponentsAllocated()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index 4199cfb..0fa69ec 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -22,6 +22,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls; import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexCursor; import org.apache.hyracks.storage.am.common.api.ISearchPredicate; @@ -164,4 +165,18 @@ public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessor { ctx.setOperation(IndexOperation.DELETE); lsmHarness.forceModify(ctx, tuple); } + + @Override + public void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException { + // a hack because delete only gets the memory component + ctx.setOperation(IndexOperation.DELETE); + lsmHarness.updateMeta(ctx,key,value); + } + + @Override + public void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException { + // a hack because delete only gets the memory component + ctx.setOperation(IndexOperation.DELETE); + lsmHarness.forceUpdateMeta(ctx, key, value); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index cef4257..0a6ffd7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls; import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.IIndexCursor; import org.apache.hyracks.storage.am.common.api.ISearchPredicate; @@ -185,4 +186,18 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd throw new UnsupportedOperationException("Cannot open inverted list cursor on lsm inverted index."); } + @Override + public void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException { + // a hack because delete only gets the memory component + ctx.setOperation(IndexOperation.DELETE); + lsmHarness.updateMeta(ctx, key, value); + } + + @Override + public void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException { + // a hack because delete only gets the memory component + ctx.setOperation(IndexOperation.DELETE); + lsmHarness.forceUpdateMeta(ctx, key, value); + } + }