abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1613
Change subject: Fix creation of callback factories ...................................................................... Fix creation of callback factories Change-Id: Idbeacf5af01b77c5f81b59aa6acec9b13762d629 --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java 8 files changed, 89 insertions(+), 103 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/13/1613/1 diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java index 12f28c5..ac3caf3 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.active; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.common.api.IAppRuntimeContext; import org.apache.hyracks.api.comm.IFrameWriter; @@ -28,6 +31,7 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutputSourceOperatorNodePushable implements IActiveRuntime { + private final Logger LOGGER = Logger.getLogger(ActiveSourceOperatorNodePushable.class.getName()); protected final IHyracksTaskContext ctx; protected final ActiveManager activeManager; /** A unique identifier for the runtime **/ @@ -79,6 +83,7 @@ @Override public final void initialize() throws HyracksDataException { + LOGGER.log(Level.INFO, "initialize() called on ActiveSourceOperatorNodePushable"); activeManager.registerRuntime(this); try { // notify cc that runtime has been registered @@ -86,15 +91,18 @@ ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null); start(); } catch (InterruptedException e) { + LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable", e); Thread.currentThread().interrupt(); throw new HyracksDataException(e); } catch (Exception e) { + LOGGER.log(Level.INFO, "initialize() failed on ActiveSourceOperatorNodePushable", e); throw new HyracksDataException(e); } finally { synchronized (this) { done = true; notifyAll(); } + LOGGER.log(Level.INFO, "initialize() returning on ActiveSourceOperatorNodePushable"); } } @@ -105,10 +113,12 @@ ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED), null); } catch (Exception e) { + LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable", e); throw new HyracksDataException(e); + } finally { + LOGGER.log(Level.INFO, "deinitialize() returning on ActiveSourceOperatorNodePushable"); } } - @Override public final IFrameWriter getInputFrameWriter(int index) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java index 2ff0617..e99cba5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java @@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; @@ -50,11 +51,13 @@ IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory, int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory, - IModificationOperationCallbackFactory modificationOpCallbackFactory, String indexName, + IModificationOperationCallbackFactory modificationOpCallbackFactory, + ISearchOperationCallbackFactory searchCallbackFactory, String indexName, IPageManagerFactory pageManagerFactory) { super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, op, dataflowHelperFactory, tupleFilterFactory, modificationOpCallbackFactory, + searchCallbackFactory, pageManagerFactory); this.indexName = indexName; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 39ea54d..175fe2b 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -31,14 +31,12 @@ import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.context.IStorageComponentProvider; -import org.apache.asterix.common.context.ITransactionSubsystemProvider; import org.apache.asterix.common.dataflow.IApplicationContextInfo; import org.apache.asterix.common.dataflow.LSMInvertedIndexInsertDeleteOperatorDescriptor; import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; @@ -89,15 +87,6 @@ import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.runtime.utils.RuntimeComponentsProvider; -import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; -import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory; -import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; -import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory; -import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory; -import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory; -import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory; -import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory; -import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -146,7 +135,6 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; @@ -157,7 +145,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> { private final IStorageComponentProvider storaegComponentProvider; - private final ITransactionSubsystemProvider txnSubsystemProvider; private final IMetadataPageManagerFactory metadataPageManagerFactory; private final IPrimitiveValueProviderFactory primitiveValueProviderFactory; private final StorageProperties storageProperties; @@ -182,7 +169,6 @@ this.storaegComponentProvider = componentProvider; storageProperties = AppContextInfo.INSTANCE.getStorageProperties(); libraryManager = AppContextInfo.INSTANCE.getLibraryManager(); - txnSubsystemProvider = componentProvider.getTransactionSubsystemProvider(); metadataPageManagerFactory = componentProvider.getMetadataPageManagerFactory(); primitiveValueProviderFactory = componentProvider.getPrimitiveValueProviderFactory(); } @@ -521,27 +507,13 @@ IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc; spPc = getSplitProviderAndConstraints(dataset, theIndex.getIndexName()); - - ISearchOperationCallbackFactory searchCallbackFactory; - if (isSecondary) { - searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE - : new SecondaryIndexSearchOperationCallbackFactory(); - } else { - int datasetId = dataset.getDatasetId(); - int[] primaryKeyFields = new int[numPrimaryKeys]; - for (int i = 0; i < numPrimaryKeys; i++) { - primaryKeyFields[i] = i; - } - - /** - * Due to the read-committed isolation level, - * we may acquire very short duration lock(i.e., instant lock) for readers. - */ - searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE - : new PrimaryIndexInstantSearchOperationCallbackFactory( - ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId(), - datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); + int[] primaryKeyFields = new int[numPrimaryKeys]; + for (int i = 0; i < numPrimaryKeys; i++) { + primaryKeyFields[i] = i; } + + ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( + storaegComponentProvider, theIndex, jobId, IndexOperation.SEARCH, primaryKeyFields); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); RuntimeComponentsProvider rtcProvider = RuntimeComponentsProvider.RUNTIME_PROVIDER; @@ -646,8 +618,13 @@ } Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); - ISearchOperationCallbackFactory searchCallbackFactory = - temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory(); + int[] primaryKeyFields = new int[numPrimaryKeys]; + for (int i = 0; i < numPrimaryKeys; i++) { + primaryKeyFields[i] = i; + } + + ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( + storaegComponentProvider, secondaryIndex, jobId, IndexOperation.SEARCH, primaryKeyFields); RTreeSearchOperatorDescriptor rtreeSearchOp; IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this, secondaryIndex, recType, metaType, compactionInfo.first, compactionInfo.second); @@ -1011,8 +988,8 @@ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc; spPc = metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName, false); - ISearchOperationCallbackFactory searchOpCallbackFactory = - temp ? NoOpOperationCallbackFactory.INSTANCE : new SecondaryIndexSearchOperationCallbackFactory(); + ISearchOperationCallbackFactory searchOpCallbackFactory = dataset + .getSearchCallbackFactory(storaegComponentProvider, fileIndex, jobId, IndexOperation.SEARCH, null); // Create the operator ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory, outRecDesc, indexDataflowHelperFactory, retainInput, appContext.getIndexLifecycleManagerProvider(), @@ -1091,14 +1068,11 @@ primaryKeyFields[i] = i; } - IModificationOperationCallbackFactory modificationCallbackFactory = temp - ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, - primaryKeyFields, txnSubsystemProvider, Operation.UPSERT, ResourceType.LSM_BTREE) - : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, - Operation.UPSERT, ResourceType.LSM_BTREE); + IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( + storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); - LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory( - jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); + ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( + storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); @@ -1281,18 +1255,14 @@ getSplitProviderAndConstraints(dataset); // prepare callback - int datasetId = dataset.getDatasetId(); int[] primaryKeyFields = new int[numKeys]; for (i = 0; i < numKeys; i++) { primaryKeyFields[i] = i; } - IModificationOperationCallbackFactory modificationCallbackFactory = temp - ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory( - ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId, - primaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), ResourceType.LSM_BTREE) - : new PrimaryIndexModificationOperationCallbackFactory( - ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId, - primaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), ResourceType.LSM_BTREE); + IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( + storaegComponentProvider, primaryIndex, jobId, indexOp, primaryKeyFields); + ISearchOperationCallbackFactory searcgCallbackFactory = dataset + .getSearchCallbackFactory(storaegComponentProvider, primaryIndex, jobId, indexOp, primaryKeyFields); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); @@ -1310,7 +1280,7 @@ op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp, idfh, null, true, - indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE, + indexName, null, modificationCallbackFactory, searcgCallbackFactory, metadataPageManagerFactory); } return new Pair<>(op, splitsAndConstraint.second); @@ -1483,15 +1453,10 @@ // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); - int datasetId = dataset.getDatasetId(); - IModificationOperationCallbackFactory modificationCallbackFactory = temp - ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, - modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), - ResourceType.LSM_BTREE) - : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, - modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), - ResourceType.LSM_BTREE); - + IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( + storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); + ISearchOperationCallbackFactory searchOpCallbackFactory = dataset.getSearchCallbackFactory( + storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); IIndexDataflowHelperFactory idfh = dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType, @@ -1508,13 +1473,13 @@ op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, filterFactory, false, - indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE, - prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory()); + indexName, null, modificationCallbackFactory, searchOpCallbackFactory, prevFieldPermutation, + metadataPageManagerFactory, dataset.getFrameOpCallbackFactory()); } else { op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp, idfh, filterFactory, - false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE, + false, indexName, null, modificationCallbackFactory, searchOpCallbackFactory, metadataPageManagerFactory); } return new Pair<>(op, splitsAndConstraint.second); @@ -1641,15 +1606,10 @@ // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); - int datasetId = dataset.getDatasetId(); - IModificationOperationCallbackFactory modificationCallbackFactory = temp - ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, - modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), - ResourceType.LSM_RTREE) - : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, - modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), - ResourceType.LSM_RTREE); - + IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( + storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); + ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( + storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(this, @@ -1666,13 +1626,13 @@ op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexDataflowHelperFactory, filterFactory, false, - indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE, - prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory()); + indexName, null, modificationCallbackFactory, searchCallbackFactory, prevFieldPermutation, + metadataPageManagerFactory, dataset.getFrameOpCallbackFactory()); } else { op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits, comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory, filterFactory, - false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE, + false, indexName, null, modificationCallbackFactory, searchCallbackFactory, metadataPageManagerFactory); } return new Pair<>(op, splitsAndConstraint.second); @@ -1852,13 +1812,10 @@ // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); int datasetId = dataset.getDatasetId(); - IModificationOperationCallbackFactory modificationCallbackFactory = temp - ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, - modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), - ResourceType.LSM_INVERTED_INDEX) - : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, - modificationCallbackPrimaryKeyFields, txnSubsystemProvider, Operation.get(indexOp), - ResourceType.LSM_INVERTED_INDEX); + IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( + storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); + ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( + storaegComponentProvider, secondaryIndex, jobId, indexOp, modificationCallbackPrimaryKeyFields); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); IIndexDataflowHelperFactory indexDataFlowFactory = dataset.getIndexDataflowHelperFactory(this, @@ -1875,14 +1832,16 @@ op = new LSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), splitsAndConstraint.first, appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory, - fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName, + fieldPermutation, indexDataFlowFactory, filterFactory, modificationCallbackFactory, + searchCallbackFactory, indexName, prevFieldPermutation, metadataPageManagerFactory); } else { op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(), splitsAndConstraint.first, appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp, - indexDataFlowFactory, filterFactory, modificationCallbackFactory, indexName, + indexDataFlowFactory, filterFactory, modificationCallbackFactory, searchCallbackFactory, + indexName, metadataPageManagerFactory); } return new Pair<>(op, splitsAndConstraint.second); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index cd8cf3b..ff54642 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -68,6 +68,7 @@ import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory; import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory; +import org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory; import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory; @@ -519,9 +520,13 @@ int[] primaryKeyFields) throws AlgebricksException { if (getDatasetDetails().isTemp()) { return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT - ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), - primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op), - index.resourceType()) + ? index.isPrimaryIndex() + ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, + primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), + Operation.get(op), index.resourceType()) + : new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), + primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), + Operation.get(op), index.resourceType()) : NoOpOperationCallbackFactory.INSTANCE; } else if (index.isPrimaryIndex()) { return op == IndexOperation.UPSERT diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java index f1547a8..9d0b917 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java @@ -31,6 +31,7 @@ import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; @@ -50,11 +51,11 @@ IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory, int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory, - String indexName, int[] prevFieldPermutation, IPageManagerFactory pageManagerFactory) { + ISearchOperationCallbackFactory searchCallbackFactory, String indexName, int[] prevFieldPermutation, IPageManagerFactory pageManagerFactory) { super(spec, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, IndexOperation.UPSERT, dataflowHelperFactory, tupleFilterFactory, - modificationOpCallbackFactory, indexName, pageManagerFactory); + modificationOpCallbackFactory, searchCallbackFactory, indexName, pageManagerFactory); this.prevFieldPermutation = prevFieldPermutation; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java index b37ecae..e6cfd2a 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java @@ -54,13 +54,13 @@ IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation, IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory, boolean isPrimary, String indexName, IMissingWriterFactory missingWriterFactory, - IModificationOperationCallbackFactory modificationOpCallbackProvider, + IModificationOperationCallbackFactory modificationOpCallbackFactory, ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation, IPageManagerFactory pageManagerFactory, IFrameOperationCallbackFactory frameOpCallbackFactory) { super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, bloomFilterKeyFields, fieldPermutation, IndexOperation.UPSERT, dataflowHelperFactory, tupleFilterFactory, isPrimary, indexName, missingWriterFactory, - modificationOpCallbackProvider, searchOpCallbackProvider, pageManagerFactory); + modificationOpCallbackFactory, searchOpCallbackProvider, pageManagerFactory); this.prevValuePermutation = prevValuePermutation; this.frameOpCallbackFactory = frameOpCallbackFactory; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java index e9c0e5f..da3e986 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java @@ -18,20 +18,27 @@ */ package org.apache.hyracks.storage.am.lsm.common.impls; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.common.api.IndexException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; -public enum SynchronousScheduler implements ILSMIOOperationScheduler { - INSTANCE; +public class SynchronousScheduler implements ILSMIOOperationScheduler { + private static final Logger LOGGER = Logger.getLogger(SynchronousScheduler.class.getName()); + public static final SynchronousScheduler INSTANCE = new SynchronousScheduler(); + + private SynchronousScheduler() { + } @Override public void scheduleOperation(ILSMIOOperation operation) throws HyracksDataException { try { operation.call(); - } catch (IndexException e) { - throw new HyracksDataException(e); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "IO Operation failed", e); + throw HyracksDataException.create(e); } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java index 46201d5..a342370 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java @@ -31,9 +31,9 @@ import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; +import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable; import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; @@ -55,11 +55,12 @@ int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory, + ISearchOperationCallbackFactory searchCallbackFactory, IPageManagerFactory pageManagerFactory) { super(spec, 1, 1, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory, dataflowHelperFactory, tupleFilterFactory, false, false, - null, NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, + null, NoOpLocalResourceFactoryProvider.INSTANCE, searchCallbackFactory, modificationOpCallbackFactory, pageManagerFactory); this.fieldPermutation = fieldPermutation; -- To view, visit https://asterix-gerrit.ics.uci.edu/1613 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Idbeacf5af01b77c5f81b59aa6acec9b13762d629 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>