Luo Chen has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1791
Change subject: Add Disk Component Scan operation for primary LSM index ...................................................................... Add Disk Component Scan operation for primary LSM index -Added disk component scan operation for primary LSMBTree index, which would be used by creating new secondary index -This operation scans all disk components of the primary index, and return all tuples. Thus, tuples with the same primary key in different in components would be returned separately. -The returned tuple has an extra int field, which indicates which component this tuple comes from, and a boolean flag, which indicates whether this tuple is an anti-matter tuple or not. Change-Id: I31b2c67c58cb0a440c1d2c26400af322e2f1c1e5 --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java A hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java 17 files changed, 622 insertions(+), 5 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/91/1791/1 diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 23031bc..d0c6f60 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -89,6 +89,7 @@ public static final int CANNOT_CLEAR_INACTIVE_INDEX = 53; public static final int CANNOT_ALLOCATE_MEMORY_FOR_INACTIVE_INDEX = 54; public static final int RESOURCE_DOES_NOT_EXIST = 55; + public static final int DISK_COMPONENT_SCAN_NOT_SUPPORTED_IN_SECONDARY_INDEX = 56; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 88e4204..3dbc926 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -74,5 +74,6 @@ 53 = Failed to clear the index since it is inactive 54 = Failed to allocate memory components for the index since it is inactive 55 = Resource does not exist for %1$s +56 = LSM disk component scan is not supported in Secondary LSM Index 10000 = The given rule collection %1$s is not an instance of the List class. diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java index 3df27ad..bd18ea0 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java @@ -26,6 +26,9 @@ import org.apache.hyracks.data.std.api.IPointableFactory; public final class BooleanPointable extends AbstractPointable implements IHashable, IComparable { + + public static final BooleanPointableFactory FACTORY = new BooleanPointableFactory(); + public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() { private static final long serialVersionUID = 1L; @@ -40,7 +43,7 @@ } }; - public static final IPointableFactory FACTORY = new IPointableFactory() { + public static class BooleanPointableFactory implements IPointableFactory { private static final long serialVersionUID = 1L; @Override @@ -48,11 +51,17 @@ return new BooleanPointable(); } + public IPointable createPointable(boolean value) { + BooleanPointable pointable = new BooleanPointable(); + pointable.setBoolean(value); + return pointable; + } + @Override public ITypeTraits getTypeTraits() { return TYPE_TRAITS; } - }; + } public static boolean getBoolean(byte[] bytes, int start) { return bytes[start] != 0; @@ -67,6 +76,11 @@ } public void setBoolean(boolean value) { + if (bytes == null) { + start = 0; + length = TYPE_TRAITS.getFixedLength(); + bytes = new byte[length]; + } setBoolean(bytes, start, value); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java index 43e0889..71a3f71 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java @@ -32,5 +32,6 @@ MERGE, FULL_MERGE, FLUSH, - REPLICATE + REPLICATE, + DISK_COMPONENT_SCAN } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index 29532d4..5bbc3c0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -73,6 +73,7 @@ import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.storage.common.MultiComparator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.file.IFileMapProvider; @@ -284,6 +285,19 @@ } @Override + public void scanDiskComponents(ILSMIndexOperationContext ictx, IIndexCursor cursor) throws HyracksDataException { + if (!isPrimaryIndex()) { + throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_SUPPORTED_IN_SECONDARY_INDEX); + } + LSMBTreeOpContext ctx = (LSMBTreeOpContext) ictx; + List<ILSMComponent> operationalComponents = ctx.getComponentHolder(); + MultiComparator comp = MultiComparator.create(getComparatorFactories()); + ISearchPredicate pred = new RangePredicate(null, null, true, true, comp, comp); + ctx.getSearchInitialState().reset(pred, operationalComponents); + ((LSMBTreeSearchCursor) cursor).scan(ctx.getSearchInitialState(), pred); + } + + @Override public ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException { LSMBTreeFlushOperation flushOp = (LSMBTreeFlushOperation) operation; LSMBTreeMemoryComponent flushingComponent = (LSMBTreeMemoryComponent) flushOp.getFlushingComponent(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java new file mode 100644 index 0000000..bcc7c35 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.btree.impls; + +import java.util.logging.Logger; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.BooleanPointable; +import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.btree.api.IBTreeLeafFrame; +import org.apache.hyracks.storage.am.btree.impls.BTree; +import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor; +import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; +import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; +import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexSearchCursor; +import org.apache.hyracks.storage.common.ICursorInitialState; +import org.apache.hyracks.storage.common.IIndexCursor; +import org.apache.hyracks.storage.common.ISearchPredicate; +import org.apache.hyracks.storage.common.MultiComparator; + +public class LSMBTreeDiskComponentScanCursor extends LSMIndexSearchCursor { + + private static final Logger LOGGER = Logger.getLogger(LSMBTreeDiskComponentScanCursor.class.getName()); + + private static final IValueReference MATTER_TUPLE_FLAG = BooleanPointable.FACTORY.createPointable(false); + private static final IValueReference ANTIMATTER_TUPLE_FLAG = BooleanPointable.FACTORY.createPointable(true); + + private BTreeAccessor[] btreeAccessors; + + private ArrayTupleBuilder tupleBuilder; + private ArrayTupleBuilder antiMatterTupleBuilder; + private final ArrayTupleReference outputTuple; + private PermutingTupleReference originalTuple; + + private boolean foundNext; + + private IntegerPointable cursorIndexPointable; + + public LSMBTreeDiskComponentScanCursor(ILSMIndexOperationContext opCtx) { + super(opCtx, true); + this.outputTuple = new ArrayTupleReference(); + } + + @Override + public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException { + LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState; + cmp = lsmInitialState.getOriginalKeyComparator(); + operationalComponents = lsmInitialState.getOperationalComponents(); + lsmHarness = lsmInitialState.getLSMHarness(); + includeMutableComponent = false; + + int numBTrees = operationalComponents.size(); + if (rangeCursors == null || rangeCursors.length != numBTrees) { + // object creation: should be relatively low + rangeCursors = new IIndexCursor[numBTrees]; + btreeAccessors = new BTreeAccessor[numBTrees]; + } + for (int i = 0; i < numBTrees; i++) { + ILSMComponent component = operationalComponents.get(i); + BTree btree; + if (rangeCursors[i] == null) { + // create, should be relatively rare + IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) lsmInitialState.getLeafFrameFactory().createFrame(); + rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false); + } else { + // re-use + rangeCursors[i].reset(); + } + btree = ((LSMBTreeDiskComponent) component).getBTree(); + + if (btreeAccessors[i] == null) { + btreeAccessors[i] = (BTreeAccessor) btree.createAccessor(NoOpOperationCallback.INSTANCE, + NoOpOperationCallback.INSTANCE); + } else { + // re-use + btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + } + btreeAccessors[i].search(rangeCursors[i], searchPred); + } + + cursorIndexPointable = new IntegerPointable(); + int length = IntegerPointable.TYPE_TRAITS.getFixedLength(); + cursorIndexPointable.set(new byte[length], 0, length); + + setPriorityQueueComparator(); + initPriorityQueue(); + } + + @Override + public void next() throws HyracksDataException { + foundNext = false; + } + + @Override + public boolean hasNext() throws HyracksDataException { + if (foundNext) { + return true; + } + while (super.hasNext()) { + super.next(); + LSMBTreeTupleReference diskTuple = (LSMBTreeTupleReference) super.getTuple(); + if (diskTuple.isAntimatter()) { + if (setAntiMatterTuple(diskTuple, outputElement.getCursorIndex())) { + foundNext = true; + return true; + } + } else { + //matter tuple + setMatterTuple(diskTuple, outputElement.getCursorIndex()); + foundNext = true; + return true; + } + } + + return false; + } + + @Override + protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB) + throws HyracksDataException { + // This method is used to check whether tupleA and tupleB (from different disk components) are identical. + // If so, the tuple from the older component is ignored by default. + // Here we use a simple trick so that tuples from different disk components are always not the same + // so that they would be returned by the cursor anyway. + return -1; + } + + private void setMatterTuple(ITupleReference diskTuple, int cursorIndex) throws HyracksDataException { + if (tupleBuilder == null) { + tupleBuilder = new ArrayTupleBuilder(diskTuple.getFieldCount() + 2); + antiMatterTupleBuilder = new ArrayTupleBuilder(diskTuple.getFieldCount() + 2); + int[] permutation = new int[diskTuple.getFieldCount()]; + for (int i = 0; i < permutation.length; i++) { + permutation[i] = i + 2; + } + originalTuple = new PermutingTupleReference(permutation); + } + + //build the matter tuple + buildTuple(tupleBuilder, diskTuple, cursorIndex, MATTER_TUPLE_FLAG); + outputTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); + originalTuple.reset(outputTuple); + } + + private boolean setAntiMatterTuple(ITupleReference diskTuple, int cursorIndex) throws HyracksDataException { + if (originalTuple == null || cmp.compare(diskTuple, originalTuple) != 0) { + //This shouldn't happen, because we shouldn't place an anti-matter tuple + //into the primary index if that tuple is not there + LOGGER.warning("Couldn't find the original tuple for the anti-matter tuple in LSMBTree."); + return false; + } + buildTuple(antiMatterTupleBuilder, originalTuple, cursorIndex, ANTIMATTER_TUPLE_FLAG); + outputTuple.reset(antiMatterTupleBuilder.getFieldEndOffsets(), antiMatterTupleBuilder.getByteArray()); + return true; + } + + private void buildTuple(ArrayTupleBuilder builder, ITupleReference diskTuple, int cursorIndex, + IValueReference tupleFlag) throws HyracksDataException { + builder.reset(); + cursorIndexPointable.setInteger(cursorIndex); + builder.addField(cursorIndexPointable); + builder.addField(tupleFlag); + for (int i = 0; i < diskTuple.getFieldCount(); i++) { + builder.addField(diskTuple.getFieldData(i), diskTuple.getFieldStart(i), diskTuple.getFieldLength(i)); + } + } + + @Override + public ITupleReference getTuple() { + return outputTuple; + } + + @Override + public void close() throws HyracksDataException { + if (lsmHarness != null) { + try { + for (int i = 0; i < rangeCursors.length; i++) { + rangeCursors[i].close(); + } + rangeCursors = null; + } finally { + lsmHarness.endScanDiskComponents(opCtx); + } + } + foundNext = false; + } + + @Override + protected void setPriorityQueueComparator() { + if (pqCmp == null || cmp != pqCmp.getMultiComparator()) { + pqCmp = new PriorityQueueScanComparator(cmp); + } + } + + private class PriorityQueueScanComparator extends PriorityQueueComparator { + public PriorityQueueScanComparator(MultiComparator cmp) { + super(cmp); + } + + @Override + public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) { + int result; + try { + result = cmp.compare(elementA.getTuple(), elementB.getTuple()); + if (result != 0) { + return result; + } + } catch (HyracksDataException e) { + throw new IllegalArgumentException(e); + } + + // the components in the component list are in descending order of creation time + // we want older components to be returned first + if (elementA.getCursorIndex() > elementB.getCursorIndex()) { + return -1; + } else { + return 1; + } + } + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java index c9bee31..fef8afe 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeSearchCursor.java @@ -37,22 +37,30 @@ private final LSMBTreePointSearchCursor pointCursor; private final LSMBTreeRangeSearchCursor rangeCursor; + private final LSMBTreeDiskComponentScanCursor scanCursor; private ITreeIndexCursor currentCursor; public LSMBTreeSearchCursor(ILSMIndexOperationContext opCtx) { pointCursor = new LSMBTreePointSearchCursor(opCtx); rangeCursor = new LSMBTreeRangeSearchCursor(opCtx); + scanCursor = new LSMBTreeDiskComponentScanCursor(opCtx); } @Override public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException { LSMBTreeCursorInitialState lsmInitialState = (LSMBTreeCursorInitialState) initialState; RangePredicate btreePred = (RangePredicate) searchPred; + currentCursor = btreePred.isPointPredicate(lsmInitialState.getOriginalKeyComparator()) ? pointCursor : rangeCursor; currentCursor.open(lsmInitialState, searchPred); } + public void scan(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException { + currentCursor = scanCursor; + currentCursor.open(initialState, searchPred); + } + @Override public boolean hasNext() throws HyracksDataException { return currentCursor.hasNext(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 0fe7e04..c0a3f2d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -81,6 +81,25 @@ void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException; /** + * Scan all disk components of the index + * + * @param ctx + * the search operation context + * @param cursor + * the index cursor + * @throws HyracksDataException + */ + void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException; + + /** + * End the scan + * + * @param ctx + * @throws HyracksDataException + */ + void endScanDiskComponents(ILSMIndexOperationContext ctx) throws HyracksDataException; + + /** * Schedule a merge * * @param ctx diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java index 2a9186b..28a39ff 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java @@ -65,6 +65,8 @@ void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException; + public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException; + void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException; ILSMDiskComponent flush(ILSMIOOperation operation) throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index ad53e73..1042df2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -24,6 +24,7 @@ import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.common.IIndexAccessor; +import org.apache.hyracks.storage.common.IIndexCursor; /** * Client handle for performing operations @@ -230,4 +231,18 @@ * @throws HyracksDataException */ void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException; + + /** + * Open the given cursor for scanning all disk components of the primary index. + * The returned tuple has the format of [(int) disk_component_position, (boolean) anti-matter flag, + * primary key, payload]. + * The returned tuples are first ordered on primary key, and then ordered on the descending order of + * disk_component_position (older components get returned first) + * + * @param icursor + * Cursor over the index entries satisfying searchPred. + * @throws HyracksDataException + * If the BufferCache throws while un/pinning or un/latching. + */ + void scanDiskComponents(IIndexCursor cursor) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java index b083770..63d2697 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/LSMOperationType.java @@ -24,5 +24,6 @@ FORCE_MODIFICATION, FLUSH, MERGE, - REPLICATE + REPLICATE, + DISK_COMPONENT_SCAN } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java index 508a6cc..2a58d9c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -45,6 +45,7 @@ case MODIFICATION: case REPLICATE: case SEARCH: + case DISK_COMPONENT_SCAN: readerCount++; break; case MERGE: @@ -78,6 +79,7 @@ case MODIFICATION: case REPLICATE: case SEARCH: + case DISK_COMPONENT_SCAN: readerCount--; if (readerCount == 0 && state == ComponentState.READABLE_MERGING) { state = ComponentState.INACTIVE; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index 7c34326..45d723b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -59,6 +59,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.common.IIndexBulkLoader; +import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.buffercache.IBufferCache; @@ -323,12 +324,20 @@ case REPLICATE: operationalComponents.addAll(ctx.getComponentsToBeReplicated()); break; + case DISK_COMPONENT_SCAN: + operationalComponents.addAll(immutableComponents); + break; default: throw new UnsupportedOperationException("Operation " + ctx.getOperation() + " not supported."); } } @Override + public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException { + throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_SUPPORTED_IN_SECONDARY_INDEX); + } + + @Override public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { ILSMMemoryComponent flushingComponent = (ILSMMemoryComponent) ctx.getComponentHolder().get(0); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 1502706..fe6afc3 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -26,6 +26,7 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation; import org.apache.hyracks.data.std.api.IValueReference; @@ -454,6 +455,33 @@ } @Override + public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException { + if (!lsmIndex.isPrimaryIndex()) { + throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_SUPPORTED_IN_SECONDARY_INDEX); + } + LSMOperationType opType = LSMOperationType.DISK_COMPONENT_SCAN; + getAndEnterComponents(ctx, opType, false); + try { + ctx.getSearchOperationCallback().before(null); + lsmIndex.scanDiskComponents(ctx, cursor); + } catch (Exception e) { + exitComponents(ctx, opType, null, true); + throw e; + } + } + + @Override + public void endScanDiskComponents(ILSMIndexOperationContext ctx) throws HyracksDataException { + if (ctx.getOperation() == IndexOperation.DISK_COMPONENT_SCAN) { + try { + exitComponents(ctx, LSMOperationType.DISK_COMPONENT_SCAN, null, false); + } catch (Exception e) { + throw new HyracksDataException(e); + } + } + } + + @Override public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException { if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index 20d71e2..b9714a6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -29,13 +29,13 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; +import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; -import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchPredicate; @@ -213,4 +213,10 @@ IFrameOperationCallback frameOpCallback) throws HyracksDataException { lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback); } + + @Override + public void scanDiskComponents(IIndexCursor cursor) throws HyracksDataException { + ctx.setOperation(IndexOperation.DISK_COMPONENT_SCAN); + lsmHarness.scanDiskComponents(ctx, cursor); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index fb5cc9f..298226b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; @@ -202,4 +203,10 @@ public void forceUpsert(ITupleReference tuple) throws HyracksDataException { throw new UnsupportedOperationException("Upsert not supported by lsm inverted index."); } + + @Override + public void scanDiskComponents(IIndexCursor cursor) throws HyracksDataException { + throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_SUPPORTED_IN_SECONDARY_INDEX); + + } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java new file mode 100644 index 0000000..3ba53dd --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeScanDiskComponentsTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.storage.am.lsm.btree; + +import static org.junit.Assert.*; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.util.Random; + +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.primitive.BooleanPointable; +import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; +import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; +import org.apache.hyracks.storage.am.btree.OrderedIndexTestContext; +import org.apache.hyracks.storage.am.btree.OrderedIndexTestDriver; +import org.apache.hyracks.storage.am.btree.OrderedIndexTestUtils; +import org.apache.hyracks.storage.am.btree.frames.BTreeLeafFrameType; +import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; +import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree; +import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestContext; +import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; +import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallbackFactory; +import org.apache.hyracks.storage.common.IIndexCursor; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +@SuppressWarnings("rawtypes") +public class LSMBTreeScanDiskComponentsTest extends OrderedIndexTestDriver { + + private final OrderedIndexTestUtils orderedIndexTestUtils; + + private final LSMBTreeTestHarness harness = new LSMBTreeTestHarness(); + + public LSMBTreeScanDiskComponentsTest() { + super(LSMBTreeTestHarness.LEAF_FRAMES_TO_TEST); + this.orderedIndexTestUtils = new OrderedIndexTestUtils(); + + } + + @Before + public void setUp() throws HyracksDataException { + harness.setUp(); + } + + @After + public void tearDown() throws HyracksDataException { + harness.tearDown(); + } + + @Override + protected OrderedIndexTestContext createTestContext(ISerializerDeserializer[] fieldSerdes, int numKeys, + BTreeLeafFrameType leafType, boolean filtered) throws Exception { + return LSMBTreeTestContext.create(harness.getIOManager(), harness.getVirtualBufferCaches(), + harness.getFileReference(), harness.getDiskBufferCache(), harness.getDiskFileMapProvider(), fieldSerdes, + numKeys, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), + harness.getOperationTracker(), harness.getIOScheduler(), harness.getIOOperationCallback(), + harness.getMetadataPageManagerFactory(), false); + } + + @Override + protected Random getRandom() { + return harness.getRandom(); + } + + @Override + protected void runTest(ISerializerDeserializer[] fieldSerdes, int numKeys, BTreeLeafFrameType leafType, + ITupleReference lowKey, ITupleReference highKey, ITupleReference prefixLowKey, + ITupleReference prefixHighKey) throws Exception { + OrderedIndexTestContext ctx = createTestContext(fieldSerdes, numKeys, leafType, false); + ctx.getIndex().create(); + ctx.getIndex().activate(); + // We assume all fieldSerdes are of the same type. Check the first one + // to determine which field types to generate. + if (fieldSerdes[0] instanceof IntegerSerializerDeserializer) { + test(ctx, fieldSerdes); + } else if (fieldSerdes[0] instanceof UTF8StringSerializerDeserializer) { + test(ctx, fieldSerdes); + } + + ctx.getIndex().validate(); + ctx.getIndex().deactivate(); + ctx.getIndex().destroy(); + } + + protected void test(OrderedIndexTestContext ctx, ISerializerDeserializer[] fieldSerdes) + throws HyracksDataException { + ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor(); + + //component 2 contains 1 and 2 + upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes)); + upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + + //component 1 contains 1 and -2 + upsertTuple(ctx, fieldSerdes, getValue(1, fieldSerdes)); + deleteTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + + //component 0 contains 2 and 3 + upsertTuple(ctx, fieldSerdes, getValue(3, fieldSerdes)); + upsertTuple(ctx, fieldSerdes, getValue(2, fieldSerdes)); + accessor.scheduleFlush(NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback()); + + LSMBTree btree = (LSMBTree) ctx.getIndex(); + Assert.assertEquals("Check disk components", 3, btree.getImmutableComponents().size()); + + IIndexCursor cursor = accessor.createSearchCursor(false); + accessor.scanDiskComponents(cursor); + + ITupleReference tuple = getNext(cursor); + checkReturnedTuple(ctx, tuple, fieldSerdes, 2, false, getValue(1, fieldSerdes)); + + tuple = getNext(cursor); + checkReturnedTuple(ctx, tuple, fieldSerdes, 1, false, getValue(1, fieldSerdes)); + + tuple = getNext(cursor); + checkReturnedTuple(ctx, tuple, fieldSerdes, 2, false, getValue(2, fieldSerdes)); + + tuple = getNext(cursor); + checkReturnedTuple(ctx, tuple, fieldSerdes, 1, true, getValue(2, fieldSerdes)); + + tuple = getNext(cursor); + checkReturnedTuple(ctx, tuple, fieldSerdes, 0, false, getValue(2, fieldSerdes)); + + tuple = getNext(cursor); + checkReturnedTuple(ctx, tuple, fieldSerdes, 0, false, getValue(3, fieldSerdes)); + + Assert.assertFalse(cursor.hasNext()); + } + + protected void checkReturnedTuple(OrderedIndexTestContext ctx, ITupleReference tuple, + ISerializerDeserializer[] fieldSerdes, int componentPos, boolean antimatter, Object value) + throws HyracksDataException { + int actualComponentPos = IntegerPointable.getInteger(tuple.getFieldData(0), tuple.getFieldStart(0)); + Assert.assertEquals("Check returned component position", componentPos, actualComponentPos); + + boolean actualAntiMatter = BooleanPointable.getBoolean(tuple.getFieldData(1), tuple.getFieldStart(1)); + Assert.assertEquals("Check returned anti-matter flag", antimatter, actualAntiMatter); + + int[] permutation = new int[ctx.getFieldCount()]; + for (int i = 0; i < permutation.length; i++) { + permutation[i] = i + 2; + } + + PermutingTupleReference originalTuple = new PermutingTupleReference(permutation); + originalTuple.reset(tuple); + + for (int i = 0; i < fieldSerdes.length; i++) { + ByteArrayInputStream inStream = new ByteArrayInputStream(originalTuple.getFieldData(i), + originalTuple.getFieldStart(i), originalTuple.getFieldLength(i)); + DataInput dataIn = new DataInputStream(inStream); + Object actualObj = fieldSerdes[i].deserialize(dataIn); + if (!actualObj.equals(value)) { + fail("Actual and expected fields do not match on field " + i + ".\nExpected: " + value + "\nActual : " + + actualObj); + } + } + + } + + protected ITupleReference getNext(IIndexCursor cursor) throws HyracksDataException { + Assert.assertTrue(cursor.hasNext()); + cursor.next(); + return cursor.getTuple(); + } + + @Override + protected String getTestOpName() { + return "Disk Components Scan"; + } + + protected void upsertTuple(OrderedIndexTestContext ctx, ISerializerDeserializer[] fieldSerdes, Object value) + throws HyracksDataException { + ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(ctx.getFieldCount()); + for (int i = 0; i < ctx.getFieldCount(); i++) { + tupleBuilder.addField(fieldSerdes[i], value); + } + ArrayTupleReference tuple = new ArrayTupleReference(); + tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); + try { + ctx.getIndexAccessor().upsert(tuple); + } catch (HyracksDataException e) { + if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) { + throw e; + } + } + } + + protected void deleteTuple(OrderedIndexTestContext ctx, ISerializerDeserializer[] fieldSerdes, Object value) + throws HyracksDataException { + ArrayTupleBuilder deletedBuilder = new ArrayTupleBuilder(ctx.getKeyFieldCount()); + for (int i = 0; i < ctx.getKeyFieldCount(); i++) { + deletedBuilder.addField(fieldSerdes[i], value); + } + ArrayTupleReference deleteTuple = new ArrayTupleReference(); + deleteTuple.reset(deletedBuilder.getFieldEndOffsets(), deletedBuilder.getByteArray()); + try { + ctx.getIndexAccessor().delete(deleteTuple); + } catch (HyracksDataException e) { + if (e.getErrorCode() != ErrorCode.DUPLICATE_KEY) { + throw e; + } + } + } + + protected Object getValue(Object value, ISerializerDeserializer[] fieldSerdes) { + if (fieldSerdes[0] instanceof IntegerSerializerDeserializer) { + return value; + } else if (fieldSerdes[0] instanceof UTF8StringSerializerDeserializer) { + return String.valueOf(value); + } else { + return null; + } + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1791 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I31b2c67c58cb0a440c1d2c26400af322e2f1c1e5 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen <cl...@uci.edu>