abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1554
Change subject: Fix transaction logs and optimize upserts ...................................................................... Fix transaction logs and optimize upserts Change-Id: Ice5296267033cd7debe76894c864c6411f761d83 --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java M asterixdb/asterix-transactions/pom.xml M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java D asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java 36 files changed, 118 insertions(+), 219 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/54/1554/1 diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java index d0cee55..20c69c4 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java @@ -46,15 +46,12 @@ private final List<LogicalVariable> primaryKeyLogicalVars; private final JobId jobId; private final Dataset dataset; - private final LogicalVariable upsertVar; private final boolean isSink; - public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, - LogicalVariable upsertVar, boolean isSink) { + public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) { this.jobId = jobId; this.dataset = dataset; this.primaryKeyLogicalVars = primaryKeyLogicalVars; - this.upsertVar = upsertVar; this.isSink = isSink; } @@ -98,12 +95,8 @@ for (int i = 0; i < splitsForDataset.length; i++) { datasetPartitions[i] = i; } - int upsertVarIdx = -1; - if (upsertVar != null) { - upsertVarIdx = inputSchemas[0].findVariable(upsertVar); - } IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId, primaryKeyFields, metadataProvider, - upsertVarIdx, datasetPartitions, isSink); + datasetPartitions, isSink); builder.contributeMicroOperator(op, runtime, recDesc); ILogicalOperator src = op.getInputs().get(0).getValue(); builder.contributeGraphEdge(src, 0, op, 0); diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java index 9b442ae..47a37d1 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java @@ -140,7 +140,7 @@ //create the logical and physical operator CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink); CommitPOperator commitPOperator = - new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, upsertVar, isSink); + new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, isSink); commitOperator.setPhysicalOperator(commitPOperator); //create ExtensionOperator and put the commitOperator in it. diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 4ee1122..b2e8640 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -98,8 +98,9 @@ this.appCtx = appCtx; this.txnSubsystem = txnSubsystem; logMgr = (LogManager) txnSubsystem.getLogManager(); - ReplicationProperties repProperties = ((IPropertiesProvider) txnSubsystem.getAsterixAppRuntimeContextProvider() - .getAppContext()).getReplicationProperties(); + ReplicationProperties repProperties = + ((IPropertiesProvider) txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()) + .getReplicationProperties(); replicationEnabled = repProperties.isParticipant(txnSubsystem.getId()); localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider() .getLocalResourceRepository(); @@ -240,7 +241,6 @@ jobCommitLogCount++; break; case LogType.ENTITY_COMMIT: - case LogType.UPSERT_ENTITY_COMMIT: if (partitions.contains(logRecord.getResourcePartition())) { analyzeEntityCommitLog(logRecord); entityCommitLogCount++; @@ -406,7 +406,6 @@ case LogType.ENTITY_COMMIT: case LogType.ABORT: case LogType.FLUSH: - case LogType.UPSERT_ENTITY_COMMIT: case LogType.WAIT: case LogType.MARKER: //do nothing @@ -599,13 +598,12 @@ } break; case LogType.ENTITY_COMMIT: - case LogType.UPSERT_ENTITY_COMMIT: if (activePartitions.contains(logRecord.getResourcePartition())) { jobLoserEntity2LSNsMap.remove(tempKeyTxnId); entityCommitLogCount++; if (IS_DEBUG_MODE) { - LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN - + "]" + tempKeyTxnId); + LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]" + + tempKeyTxnId); } } break; @@ -687,10 +685,13 @@ (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId()); ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) { + if (logRecord.getNewOp() == IndexOperation.INSERT_BYTE) { indexAccessor.forceDelete(logRecord.getNewValue()); - } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) { + } else if (logRecord.getNewOp() == IndexOperation.DELETE_BYTE) { indexAccessor.forceInsert(logRecord.getNewValue()); + } else if (logRecord.getNewOp() == IndexOperation.UPSERT_BYTE) { + // undo, upsert the old value + indexAccessor.forceUpsert(logRecord.getOldValue()); } else { throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp()); } @@ -706,10 +707,13 @@ ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId); ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); - if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) { + if (logRecord.getNewOp() == IndexOperation.INSERT_BYTE) { indexAccessor.forceInsert(logRecord.getNewValue()); - } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) { + } else if (logRecord.getNewOp() == IndexOperation.DELETE_BYTE) { indexAccessor.forceDelete(logRecord.getNewValue()); + } else if (logRecord.getNewOp() == IndexOperation.UPSERT_BYTE) { + // redo, upsert the new value + indexAccessor.forceUpsert(logRecord.getNewValue()); } else { throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp()); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index cc12f36..47e212e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -161,18 +161,18 @@ return new org.apache.asterix.common.transactions.JobId((int) jobId.getId()); } - public LSMInsertDeleteOperatorNodePushable getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset, - IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, - ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields, - int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, - StorageComponentProvider storageComponentProvider) throws AlgebricksException, HyracksDataException { + public IFrameWriter[] getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset, IAType[] primaryKeyTypes, + ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, + Map<String, String> mergePolicyProperties, int[] filterFields, int[] primaryKeyIndexes, + List<Integer> primaryKeyIndicators, StorageComponentProvider storageComponentProvider) + throws AlgebricksException, HyracksDataException { PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators, storageComponentProvider); IndexOperation op = IndexOperation.INSERT; IModificationOperationCallbackFactory modOpCallbackFactory = new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(), - primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE, true); + primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE); LSMTreeInsertDeleteOperatorDescriptor indexOpDesc = getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory); IIndexDataflowHelperFactory dataflowHelperFactory = @@ -185,7 +185,8 @@ primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true); insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc); commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc); - return insertOp; + IFrameWriter[] pipeline = { insertOp, commitOp }; + return pipeline; } public IPushRuntime getFullScanPipeline(IFrameWriter countOp, IHyracksTaskContext ctx, Dataset dataset, @@ -300,8 +301,7 @@ Index index = primaryIndexInfo.getIndex(); MetadataProvider mdProvider = new MetadataProvider(dataverse, storageComponentProvider); return dataset.getIndexDataflowHelperFactory(mdProvider, index, primaryIndexInfo.recordType, - primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory, - primaryIndexInfo.mergePolicyProperties); + primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties); } public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes, @@ -434,11 +434,10 @@ private Index index; private IStorageComponentProvider storageComponentProvider; - public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, - ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, - Map<String, String> mergePolicyProperties, int[] filterFields, int[] primaryKeyIndexes, - List<Integer> primaryKeyIndicators, IStorageComponentProvider storageComponentProvider) - throws AlgebricksException { + public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, + ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, + int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, + IStorageComponentProvider storageComponentProvider) throws AlgebricksException { this.storageComponentProvider = storageComponentProvider; this.dataset = dataset; this.primaryKeyTypes = primaryKeyTypes; @@ -477,10 +476,10 @@ index = new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(), IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, keyFieldTypes, false, true, MetadataUtil.PENDING_NO_OP); - localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider, - index, dataset, primaryIndexTypeTraits, primaryIndexComparatorFactories, - primaryIndexBloomFilterKeyFields, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, - filterCmpFactories, btreeFields, filterFields, dataset.getIndexOperationTrackerFactory(index)); + localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider, index, + dataset, primaryIndexTypeTraits, primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, + mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, + filterFields, dataset.getIndexOperationTrackerFactory(index)); } public Index getIndex() { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java index 1467dbf..435caf7 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java @@ -115,9 +115,10 @@ IHyracksTaskContext ctx = nc.createTestContext(true); nc.newJobId(); ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true); - LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, + LSMInsertDeleteOperatorNodePushable insertOp = + (LSMInsertDeleteOperatorNodePushable) nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES, - KEY_INDICATORS_LIST, storageManager); + KEY_INDICATORS_LIST, storageManager)[0]; insertOp.open(); TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java index 10e8658..0dec460 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java @@ -123,9 +123,10 @@ nc.newJobId(); ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true); // Prepare insert operation - LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, + LSMInsertDeleteOperatorNodePushable insertOp = + (LSMInsertDeleteOperatorNodePushable) nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null, KEY_INDEXES, KEY_INDICATOR_LIST, - storageManager); + storageManager)[0]; insertOp.open(); TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java index d691b18..4822c01 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java @@ -133,7 +133,6 @@ buffer.putInt(jobId); switch (logType) { case LogType.ENTITY_COMMIT: - case LogType.UPSERT_ENTITY_COMMIT: writeEntityInfo(buffer); break; case LogType.UPDATE: @@ -268,7 +267,6 @@ computeAndSetLogSize(); break; case LogType.ENTITY_COMMIT: - case LogType.UPSERT_ENTITY_COMMIT: if (readEntityInfo(buffer)) { computeAndSetLogSize(); } else { @@ -428,7 +426,6 @@ logSize = JOB_TERMINATE_LOG_SIZE; break; case LogType.ENTITY_COMMIT: - case LogType.UPSERT_ENTITY_COMMIT: logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize; break; case LogType.FLUSH: @@ -457,7 +454,7 @@ builder.append(" LogType : ").append(LogType.toString(logType)); builder.append(" LogSize : ").append(logSize); builder.append(" JobId : ").append(jobId); - if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT || logType == LogType.UPDATE) { + if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) { builder.append(" DatasetId : ").append(datasetId); builder.append(" ResourcePartition : ").append(resourcePartition); builder.append(" PKHashValue : ").append(PKHashValue); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java index 269e4b9..11c45ad 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java @@ -25,7 +25,6 @@ public static final byte ENTITY_COMMIT = 2; public static final byte ABORT = 3; public static final byte FLUSH = 4; - public static final byte UPSERT_ENTITY_COMMIT = 5; public static final byte WAIT = 6; public static final byte MARKER = 7; @@ -34,7 +33,6 @@ private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT"; private static final String STRING_ABORT = "ABORT"; private static final String STRING_FLUSH = "FLUSH"; - private static final String STRING_UPSERT_ENTITY_COMMIT = "UPSERT_ENTITY_COMMIT"; private static final String STRING_WAIT = "WAIT"; private static final String STRING_MARKER = "MARKER"; private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE"; @@ -51,8 +49,6 @@ return STRING_ABORT; case LogType.FLUSH: return STRING_FLUSH; - case LogType.UPSERT_ENTITY_COMMIT: - return STRING_UPSERT_ENTITY_COMMIT; case LogType.WAIT: return STRING_WAIT; case LogType.MARKER: diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index 51790e6..01b4db2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -444,7 +444,7 @@ // locks and secondary index doesn't. return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(), metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(), - transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp, false); + transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp); } @Override diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index f5c6d9a..9645ccf 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -1118,7 +1118,7 @@ ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, IndexOperation.UPSERT, ResourceType.LSM_BTREE) : new UpsertOperationCallbackFactory(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, - IndexOperation.UPSERT, ResourceType.LSM_BTREE, dataset.hasMetaPart()); + IndexOperation.UPSERT, ResourceType.LSM_BTREE); LockThenSearchOperationCallbackFactory searchCallbackFactory = new LockThenSearchOperationCallbackFactory( jobId, datasetId, primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE); @@ -1319,8 +1319,7 @@ primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE) : new PrimaryIndexModificationOperationCallbackFactory( ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(), datasetId, - primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE, - dataset.hasMetaPart()); + primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); @@ -1517,7 +1516,7 @@ ResourceType.LSM_BTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, - ResourceType.LSM_BTREE, dataset.hasMetaPart()); + ResourceType.LSM_BTREE); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); @@ -1675,7 +1674,7 @@ ResourceType.LSM_RTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, - ResourceType.LSM_RTREE, dataset.hasMetaPart()); + ResourceType.LSM_RTREE); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); @@ -1885,7 +1884,7 @@ ResourceType.LSM_INVERTED_INDEX) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, - ResourceType.LSM_INVERTED_INDEX, dataset.hasMetaPart()); + ResourceType.LSM_INVERTED_INDEX); Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); IIndexDataflowHelperFactory indexDataFlowFactory = dataset.getIndexDataflowHelperFactory(this, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index 2e328f9..8ab9d10 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -29,7 +29,6 @@ import org.apache.asterix.active.ActiveJobNotificationHandler; import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; @@ -37,6 +36,7 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory; import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; +import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.utils.JobUtils; @@ -520,18 +520,16 @@ } else if (index.isPrimaryIndex()) { return op == IndexOperation.UPSERT ? new UpsertOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, - componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(), - hasMetaPart()) + componentProvider.getTransactionSubsystemProvider(), op, index.resourceType()) : op == IndexOperation.DELETE || op == IndexOperation.INSERT ? new PrimaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), op, - index.resourceType(), hasMetaPart()) + index.resourceType()) : NoOpOperationCallbackFactory.INSTANCE; } else { return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT ? new SecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), primaryKeyFields, - componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(), - hasMetaPart()) + componentProvider.getTransactionSubsystemProvider(), op, index.resourceType()) : NoOpOperationCallbackFactory.INSTANCE; } } @@ -572,9 +570,9 @@ } public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] primaryKeyFields, - MetadataProvider metadataProvider, int upsertVarIdx, int[] datasetPartitions, boolean isSink) { + MetadataProvider metadataProvider, int[] datasetPartitions, boolean isSink) { return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields, - metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx, + metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), datasetPartitions, isSink); } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index 044707a..85e2bef 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -462,7 +462,6 @@ switch (remoteLog.getLogType()) { case LogType.UPDATE: case LogType.ENTITY_COMMIT: - case LogType.UPSERT_ENTITY_COMMIT: //if the log partition belongs to a partitions hosted on this node, replicate it if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) { logManager.log(remoteLog); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index 6869523..d07bab3 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -49,7 +49,6 @@ import org.apache.hyracks.storage.am.btree.impls.RangePredicate; import org.apache.hyracks.storage.am.btree.util.BTreeUtils; import org.apache.hyracks.storage.am.common.api.IIndexCursor; -import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.common.api.IndexException; import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor; @@ -84,8 +83,8 @@ private final int metaFieldIndex; private LockThenSearchOperationCallback searchCallback; - public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, - int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys, + public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition, + int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys, ARecordType recordType, int filterFieldIndex) throws HyracksDataException { super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT); this.key = new PermutingFrameTupleReference(); @@ -140,15 +139,15 @@ tb = new ArrayTupleBuilder(recordDesc.getFieldCount()); dos = tb.getDataOutput(); appender = new FrameTupleAppender(new VSizeFrame(ctx), true); - modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback( - indexHelper.getResource(), ctx, this); + modCallback = opDesc.getModificationOpCallbackFactory() + .createModificationOperationCallback(indexHelper.getResource(), ctx, this); searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory() .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this); indexAccessor = index.createAccessor(modCallback, searchCallback); cursor = indexAccessor.createSearchCursor(false); frameTuple = new FrameTupleReference(); - IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext() - .getApplicationContext().getApplicationObject(); + IAppRuntimeContext runtimeCtx = + (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject(); LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index, runtimeCtx.getTransactionSubsystem().getLogManager()); } catch (Exception e) { @@ -221,17 +220,19 @@ } // if with filters, append the filter if (isFiltered) { - dos.write(prevTuple.getFieldData(filterFieldIndex), - prevTuple.getFieldStart(filterFieldIndex), + dos.write(prevTuple.getFieldData(filterFieldIndex), prevTuple.getFieldStart(filterFieldIndex), prevTuple.getFieldLength(filterFieldIndex)); tb.addFieldEndOffset(); } - modCallback.setOp(Operation.DELETE); - if (firstModification) { - lsmAccessor.delete(prevTuple); - firstModification = false; - } else { - lsmAccessor.forceDelete(prevTuple); + if (isNull(tuple, numOfPrimaryKeys)) { + // Only delete if it is a delete and not upsert + modCallback.setOp(IndexOperation.DELETE_BYTE); + if (firstModification) { + lsmAccessor.delete(prevTuple); + firstModification = false; + } else { + lsmAccessor.forceDelete(prevTuple); + } } } else { prevTuple = null; @@ -246,12 +247,12 @@ cursor.reset(); } if (!isNull(tuple, numOfPrimaryKeys)) { - modCallback.setOp(Operation.INSERT); + modCallback.setOp(IndexOperation.UPSERT_BYTE); if (firstModification) { - lsmAccessor.insert(tuple); + lsmAccessor.upsert(tuple); firstModification = false; } else { - lsmAccessor.forceInsert(tuple); + lsmAccessor.forceUpsert(tuple); } recordWasInserted = true; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java index eab9cc7..4dd1eca 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java @@ -24,7 +24,6 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation; import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference; @@ -109,12 +108,12 @@ } if (!isPrevValueNull) { // previous is not null, we need to delete previous - modCallback.setOp(Operation.DELETE); + modCallback.setOp(IndexOperation.DELETE_BYTE); lsmAccessor.forceDelete(prevValueTuple); } if (!isNewNull) { // new is not null, we need to insert the new value - modCallback.setOp(Operation.INSERT); + modCallback.setOp(IndexOperation.INSERT_BYTE); lsmAccessor.forceInsert(tuple); } diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml index a65a436..2ff0ccd 100644 --- a/asterixdb/asterix-transactions/pom.xml +++ b/asterixdb/asterix-transactions/pom.xml @@ -106,11 +106,6 @@ <scope>compile</scope> </dependency> <dependency> - <groupId>org.apache.asterix</groupId> - <artifactId>asterix-om</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java index 2a3467e..3893854 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java @@ -28,14 +28,10 @@ import org.apache.asterix.common.transactions.LogType; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter; public abstract class AbstractIndexModificationOperationCallback extends AbstractOperationCallback { - - private static final byte INSERT_OP = (byte) IndexOperation.INSERT.ordinal(); - private static final byte DELETE_OP = (byte) IndexOperation.DELETE.ordinal(); protected final long resourceId; protected final byte resourceType; protected final IndexOperation indexOp; @@ -81,14 +77,7 @@ txnSubsystem.getLogManager().log(logRecord); } - public void setOp(Operation op) throws HyracksDataException { - switch (op) { - case DELETE: - logRecord.setNewOp(DELETE_OP); - break; - case INSERT: - logRecord.setNewOp(INSERT_OP); - break; - } + public void setOp(byte op) throws HyracksDataException { + logRecord.setNewOp(op); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java index c627367..0747a8d 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java @@ -40,16 +40,13 @@ implements IModificationOperationCallback { private final LSMInsertDeleteOperatorNodePushable operatorNodePushable; - private final boolean logBeforeImage; public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition, - byte resourceType, IndexOperation indexOp, IOperatorNodePushable operatorNodePushable, - boolean logBeforeImage) { + byte resourceType, IndexOperation indexOp, IOperatorNodePushable operatorNodePushable) { super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition, resourceType, indexOp); this.operatorNodePushable = (LSMInsertDeleteOperatorNodePushable) operatorNodePushable; - this.logBeforeImage = logBeforeImage; } @Override @@ -102,11 +99,7 @@ public void found(ITupleReference before, ITupleReference after) throws HyracksDataException { try { int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields); - if (logBeforeImage) { - log(pkHash, after, before); - } else { - log(pkHash, after, null); - } + log(pkHash, after, before); } catch (ACIDException e) { throw new HyracksDataException(e); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java index 4cbb8cf..fcd4cd5 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java @@ -46,14 +46,11 @@ private static final long serialVersionUID = 1L; private final IndexOperation indexOp; - private final boolean logBeforeImage; public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields, - ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType, - boolean logBeforeImage) { + ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) { super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; - this.logBeforeImage = logBeforeImage; } @Override @@ -72,7 +69,7 @@ Resource aResource = (Resource) resource.getResource(); IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId, primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), - aResource.partition(), resourceType, indexOp, operatorNodePushable, logBeforeImage); + aResource.partition(), resourceType, indexOp, operatorNodePushable); txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true); return modCallback; } catch (ACIDException e) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java index 974e631..845e9d7 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java @@ -37,15 +37,13 @@ implements IModificationOperationCallback { protected final IndexOperation oldOp; - private final boolean logBeforeImage; public SecondaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, - int resourcePartition, byte resourceType, IndexOperation indexOp, boolean logBeforeImage) { + int resourcePartition, byte resourceType, IndexOperation indexOp) { super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition, resourceType, indexOp); oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT : IndexOperation.DELETE; - this.logBeforeImage = logBeforeImage; } @Override @@ -57,7 +55,7 @@ public void found(ITupleReference before, ITupleReference after) throws HyracksDataException { try { int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields); - this.log(pkHash, after, logBeforeImage ? before : null); + this.log(pkHash, after, before); } catch (ACIDException e) { throw new HyracksDataException(e); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java index 3925bba..cce3710 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java @@ -42,14 +42,11 @@ private static final long serialVersionUID = 1L; private final IndexOperation indexOp; - private final boolean logBeforeImage; public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields, - ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType, - boolean logBeforeImage) { + ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) { super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; - this.logBeforeImage = logBeforeImage; } @Override @@ -68,7 +65,7 @@ Resource aResource = (Resource) resource.getResource(); IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId, primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), - aResource.partition(), resourceType, indexOp, logBeforeImage); + aResource.partition(), resourceType, indexOp); txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, false); return modCallback; } catch (ACIDException e) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java index 13d2d57..413ef65 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java @@ -29,14 +29,12 @@ public class UpsertOperationCallback extends AbstractIndexModificationOperationCallback implements IModificationOperationCallback { - private final boolean logBeforeImage; public UpsertOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition, - byte resourceType, IndexOperation indexOp, boolean logBeforeImage) { + byte resourceType, IndexOperation indexOp) { super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition, resourceType, indexOp); - this.logBeforeImage = logBeforeImage; } @Override @@ -48,7 +46,7 @@ public void found(ITupleReference before, ITupleReference after) throws HyracksDataException { try { int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields); - log(pkHash, after, logBeforeImage ? before : null); + log(pkHash, after, before); } catch (ACIDException e) { throw new HyracksDataException(e); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java index 9349e93..cd702ed 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java @@ -41,14 +41,11 @@ private static final long serialVersionUID = 1L; private final IndexOperation indexOp; - private final boolean logBeforeImage; public UpsertOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields, - ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType, - boolean logBeforeImage) { + ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) { super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; - this.logBeforeImage = logBeforeImage; } @Override @@ -67,7 +64,7 @@ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false); IModificationOperationCallback modCallback = new UpsertOperationCallback(datasetId, primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), - txnSubsystem, resource.getId(), aResource.partition(), resourceType, indexOp, logBeforeImage); + txnSubsystem, resource.getId(), aResource.partition(), resourceType, indexOp); txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true); return modCallback; } catch (ACIDException e) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java index 536e657..2752461 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java @@ -34,18 +34,16 @@ private final int[] primaryKeyFields; private final boolean isTemporaryDatasetWriteJob; private final boolean isWriteTransaction; - private final int upsertVarIdx; private int[] datasetPartitions; private final boolean isSink; public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob, - boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) { + boolean isWriteTransaction, int[] datasetPartitions, boolean isSink) { this.jobId = jobId; this.datasetId = datasetId; this.primaryKeyFields = primaryKeyFields; this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob; this.isWriteTransaction = isWriteTransaction; - this.upsertVarIdx = upsertVarIdx; this.datasetPartitions = datasetPartitions; this.isSink = isSink; } @@ -57,13 +55,7 @@ @Override public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - if (upsertVarIdx >= 0) { - return new UpsertCommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, - isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], - upsertVarIdx, isSink); - } else { return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); - } } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java deleted file mode 100644 index 9b2fe36..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.runtime; - -import java.nio.ByteBuffer; - -import org.apache.asterix.common.transactions.JobId; -import org.apache.asterix.common.transactions.LogType; -import org.apache.asterix.common.utils.TransactionUtil; -import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer; -import org.apache.hyracks.api.context.IHyracksTaskContext; - -public class UpsertCommitRuntime extends CommitRuntime { - private final int upsertIdx; - - public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields, - boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx, - boolean isSink) { - super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction, - resourcePartition, isSink); - this.upsertIdx = upsertIdx; - } - - @Override - protected void formLogRecord(ByteBuffer buffer, int t) { - boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(), tAccess.getFieldSlotsLength() - + tAccess.getTupleStartOffset(t) + tAccess.getFieldStartOffset(t, upsertIdx) + 1); - if (isNull) { - // Previous record not found (insert) - super.formLogRecord(buffer, t); - } else { - // Previous record found (delete + insert) - int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields); - TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef, - primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT); - } - } -} diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java index 424e800..15918d1 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java @@ -255,18 +255,13 @@ LogRecord logRecord = logBufferTailReader.next(); while (logRecord != null) { if (logRecord.getLogSource() == LogSource.LOCAL) { - if (logRecord.getLogType() == LogType.ENTITY_COMMIT - || logRecord.getLogType() == LogType.UPSERT_ENTITY_COMMIT) { + if (logRecord.getLogType() == LogType.ENTITY_COMMIT) { reusableJobId.setId(logRecord.getJobId()); txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false); reusableDsId.setId(logRecord.getDatasetId()); txnSubsystem.getLockManager().unlock(reusableDsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx); txnCtx.notifyOptracker(false); - if (logRecord.getLogType() == LogType.UPSERT_ENTITY_COMMIT) { - // since this operation consisted of delete and insert, we need to notify the optracker twice - txnCtx.notifyOptracker(false); - } if (TransactionUtil.PROFILE_MODE) { txnSubsystem.incrementEntityCommitCount(); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index d885f00..e3ad3dd 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -48,7 +48,6 @@ if (shouldReplicate) { switch (logRecord.getLogType()) { case LogType.ENTITY_COMMIT: - case LogType.UPSERT_ENTITY_COMMIT: case LogType.UPDATE: case LogType.FLUSH: shouldReplicate = replicationStrategy.isMatch(logRecord.getDatasetId()); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java index 89d043c..70c5557 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java @@ -29,4 +29,8 @@ this.inputRecordDesc = recordDescriptor; } + public RecordDescriptor getInputRecordDescriptor() { + return inputRecordDesc; + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java index f22c239..1684923 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java @@ -29,10 +29,6 @@ * @author zheilbron */ public interface IModificationOperationCallback { - public enum Operation { - INSERT, - DELETE - } /** * This method is called on a tuple that is about to traverse an index's structure @@ -60,5 +56,5 @@ * @param op * @throws HyracksDataException */ - public void setOp(Operation op) throws HyracksDataException; + public void setOp(byte op) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java index e8ab8dc..98173bd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java @@ -61,7 +61,7 @@ } @Override - public void setOp(Operation op) throws HyracksDataException { + public void setOp(byte op) throws HyracksDataException { // Do nothing. } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java index 43e0889..b416f5a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java @@ -32,5 +32,8 @@ MERGE, FULL_MERGE, FLUSH, - REPLICATE + REPLICATE; + public static final byte INSERT_BYTE = (byte) INSERT.ordinal(); + public static final byte DELETE_BYTE = (byte) DELETE.ordinal(); + public static final byte UPSERT_BYTE = (byte) UPSERT.ordinal(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index 53b8405..473e28d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -57,10 +57,10 @@ import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; @@ -286,10 +286,10 @@ case PHYSICALDELETE: case FLUSH: case DELETE: + case UPSERT: operationalComponents.add(memoryComponents.get(cmc)); break; case INSERT: - case UPSERT: addOperationalMutableComponents(operationalComponents); operationalComponents.addAll(immutableComponents); break; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index fecc674..3e3b0d4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -117,6 +117,8 @@ void forceInsert(ITupleReference tuple) throws HyracksDataException, IndexException; + void forceUpsert(ITupleReference tuple) throws HyracksDataException, IndexException; + void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException; void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean bulkload, LSMOperationType opType) diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index 4199cfb..af62cbf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -160,6 +160,12 @@ } @Override + public void forceUpsert(ITupleReference tuple) throws HyracksDataException, IndexException { + ctx.setOperation(IndexOperation.UPSERT); + lsmHarness.forceModify(ctx, tuple); + } + + @Override public void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException { ctx.setOperation(IndexOperation.DELETE); lsmHarness.forceModify(ctx, tuple); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index cef4257..ff09f5b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -185,4 +185,9 @@ throw new UnsupportedOperationException("Cannot open inverted list cursor on lsm inverted index."); } + @Override + public void forceUpsert(ITupleReference tuple) throws HyracksDataException, IndexException { + throw new UnsupportedOperationException("Upsert not supported by lsm inverted index."); + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java index 6da9334..1dc3a73 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java @@ -100,7 +100,7 @@ } @Override - public void setOp(Operation op) throws HyracksDataException { + public void setOp(byte op) throws HyracksDataException { } } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java index 15d8a60..089595f 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java @@ -67,7 +67,7 @@ } @Override - public void setOp(Operation op) throws HyracksDataException { + public void setOp(byte op) throws HyracksDataException { // Do nothing. } -- To view, visit https://asterix-gerrit.ics.uci.edu/1554 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ice5296267033cd7debe76894c864c6411f761d83 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>