abdullah alamoudi has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1618
Change subject: Ensure nextFrame and flush are not called in a failed pipeline ...................................................................... Ensure nextFrame and flush are not called in a failed pipeline Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1 --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java 22 files changed, 115 insertions(+), 60 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/18/1618/1 diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java index 2eca55d..296a3ed 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java @@ -72,7 +72,7 @@ } @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0); accessor = new FrameTupleAccessor(inputRecDesc); writeBuffer = new VSizeFrame(ctx); @@ -102,7 +102,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { currentTupleIdx = 0; lastFlushedTupleIdx = 0; flushedPartialTuples = false; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java index 9982477..7b11cde 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java @@ -91,7 +91,7 @@ } @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { synchronized (writer) { writer.open(); } @@ -135,7 +135,7 @@ } @Override - public void nextFrame(ByteBuffer frame) throws HyracksDataException { + protected void doNextFrame(ByteBuffer frame) throws HyracksDataException { try { total++; if (consumer.cause() != null) { @@ -390,7 +390,7 @@ } @Override - public void flush() throws HyracksDataException { + public void doFlush() throws HyracksDataException { synchronized (writer) { writer.flush(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java index 1bdc7e1..a8ca426 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java @@ -35,12 +35,12 @@ } @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { writer.open(); } @Override - public void nextFrame(ByteBuffer frame) throws HyracksDataException { + protected void doNextFrame(ByteBuffer frame) throws HyracksDataException { while (frame != null) { try { writer.nextFrame(frame); @@ -65,7 +65,7 @@ } @Override - public void flush() throws HyracksDataException { + protected void doFlush() throws HyracksDataException { writer.flush(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java index 69a2020..6e2b190 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java @@ -45,7 +45,7 @@ // We override the open function to search a specific version of the index @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { writer.open(); ExternalBTreeWithBuddyDataflowHelper dataFlowHelper = (ExternalBTreeWithBuddyDataflowHelper) indexHelper; accessor = new FrameTupleAccessor(inputRecDesc); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java index c95a4a7..5e4db84 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java @@ -54,7 +54,7 @@ // when creating the bulkLoader, it creates a transaction bulk loader // It uses the bulkLoader to insert delete tuples for the deleted files @Override - public void open() throws HyracksDataException { + public void doOpen() throws HyracksDataException { RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0); accessor = new FrameTupleAccessor(recDesc); indexHelper.open(); @@ -76,7 +76,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + public void doNextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); int tupleCount = accessor.getTupleCount(); for (int i = 0; i < tupleCount; i++) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java index d4718a9..8b888e1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java @@ -78,7 +78,7 @@ private boolean indexOpen = false; @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { try { adapter = adapterFactory.createAdapter(ctx, partition, recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), snapshotAccessor, writer); @@ -122,7 +122,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { try { adapter.nextFrame(buffer); } catch (Throwable th) { @@ -131,7 +131,7 @@ } @Override - public void flush() throws HyracksDataException { + protected void doFlush() throws HyracksDataException { adapter.flush(); } }; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java index 237e2c0..e3415cc 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java @@ -44,7 +44,7 @@ // We override this method to specify the searched version of the index @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { writer.open(); accessor = new FrameTupleAccessor(inputRecDesc); indexHelper.open(); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java index bdc11f5..84176cb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java @@ -73,12 +73,12 @@ } @Override - public void open() throws HyracksDataException { + public void doOpen() throws HyracksDataException { writer.open(); } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + public void doNextFrame(ByteBuffer buffer) throws HyracksDataException { writer.nextFrame(buffer); } @@ -88,7 +88,7 @@ } @Override - public void flush() throws HyracksDataException { + public void doFlush() throws HyracksDataException { writer.flush(); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java index fbdbece..5fa03c3 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java @@ -113,7 +113,7 @@ } @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { ActiveRuntimeId runtimeId = new ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString(), partition); try { initializeNewFeedRuntime(runtimeId); @@ -138,7 +138,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { try { FeedUtils.processFeedMessage(buffer, message, fta); writer.nextFrame(buffer); @@ -161,7 +161,7 @@ } @Override - public void flush() throws HyracksDataException { + protected void doFlush() throws HyracksDataException { writer.flush(); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java index 0bb27db..d3e5676 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java @@ -107,9 +107,9 @@ } @Override - public void open() throws HyracksDataException { - ActiveRuntimeId runtimeId = new ActiveRuntimeId(connectionId.getFeedId(), - runtimeType.toString() + "." + targetId, partition); + protected void doOpen() throws HyracksDataException { + ActiveRuntimeId runtimeId = + new ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString() + "." + targetId, partition); try { initializeNewFeedRuntime(runtimeId); insertOperator.open(); @@ -123,8 +123,7 @@ fta = new FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0)); insertOperator.setOutputFrameWriter(0, writer, recordDesc); if (insertOperator instanceof LSMInsertDeleteOperatorNodePushable) { - LSMInsertDeleteOperatorNodePushable indexOp = - (LSMInsertDeleteOperatorNodePushable) insertOperator; + LSMInsertDeleteOperatorNodePushable indexOp = (LSMInsertDeleteOperatorNodePushable) insertOperator; if (!indexOp.isPrimary()) { writer = insertOperator; return; @@ -139,7 +138,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { try { FeedUtils.processFeedMessage(buffer, message, fta); writer.nextFrame(buffer); @@ -160,7 +159,7 @@ } @Override - public void flush() throws HyracksDataException { + protected void doFlush() throws HyracksDataException { writer.flush(); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index 1f18c97..b51db6f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -125,7 +125,7 @@ // have been obtained through searchForUpsert operation @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0); accessor = new FrameTupleAccessor(inputRecDesc); writeBuffer = new VSizeFrame(ctx); @@ -202,7 +202,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor; int tupleCount = accessor.getTupleCount(); @@ -333,7 +333,7 @@ } @Override - public void flush() throws HyracksDataException { + public void doFlush() throws HyracksDataException { // No op since nextFrame flushes by default } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java index 0db5ff0..8b46026 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java @@ -65,8 +65,8 @@ } @Override - public void open() throws HyracksDataException { - super.open(); + protected void doOpen() throws HyracksDataException { + super.doOpen(); abstractModCallback = (AbstractIndexModificationOperationCallback) modCallback; } @@ -96,7 +96,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor; int tupleCount = accessor.getTupleCount(); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java index ec31eb7..5d27b07 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java @@ -18,10 +18,19 @@ */ package org.apache.hyracks.dataflow.std.base; +import java.nio.ByteBuffer; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.apache.hyracks.api.comm.IFrameWriter; +import org.apache.hyracks.api.exceptions.HyracksDataException; public abstract class AbstractUnaryInputUnaryOutputOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable implements IFrameWriter { + private static final Logger LOGGER = + Logger.getLogger(AbstractUnaryInputUnaryOutputOperatorNodePushable.class.getName()); + private boolean failed = false; + @Override public final IFrameWriter getInputFrameWriter(int index) { return this; @@ -31,4 +40,46 @@ public final int getInputArity() { return 1; } + + @Override + public final void open() throws HyracksDataException { + failed = true; + doOpen(); + failed = false; + } + + protected abstract void doOpen() throws HyracksDataException; + + @Override + public final void nextFrame(ByteBuffer buffer) throws HyracksDataException { + if (failed) { + LOGGER.log(Level.WARNING, "next Frame called on a failed writer"); + return; + } + failed = true; + doNextFrame(buffer); + failed = false; + } + + protected abstract void doNextFrame(ByteBuffer buffer) throws HyracksDataException; + + @Override + public final void flush() throws HyracksDataException { + if (failed) { + LOGGER.log(Level.WARNING, "flush called on a failed writer"); + return; + } + failed = true; + doFlush(); + failed = false; + } + + public final void unfail() { + if (!failed) { + LOGGER.log(Level.WARNING, "unfail called on a not failed writer"); + } + failed = false; + } + + protected abstract void doFlush() throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java index 2acc4db..06e51e3 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java @@ -52,7 +52,7 @@ } @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); @@ -63,7 +63,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { pgw.nextFrame(buffer); } @@ -78,7 +78,7 @@ } @Override - public void flush() throws HyracksDataException { + protected void doFlush() throws HyracksDataException { pgw.flush(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java index a07540b..3ba2a20 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java @@ -43,12 +43,12 @@ throws HyracksDataException { return new AbstractUnaryInputUnaryOutputOperatorNodePushable() { @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { writer.open(); } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { writer.nextFrame(buffer); } @@ -63,7 +63,7 @@ } @Override - public void flush() throws HyracksDataException { + protected void doFlush() throws HyracksDataException { writer.flush(); } }; diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java index 084c9ab..396655d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java @@ -54,7 +54,7 @@ private boolean finished; @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { fta = new FrameTupleAccessor(recordDescriptors[0]); currentSize = 0; finished = false; @@ -62,7 +62,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { if (!finished) { fta.reset(buffer); int count = fta.getTupleCount(); @@ -94,7 +94,7 @@ } @Override - public void flush() throws HyracksDataException { + protected void doFlush() throws HyracksDataException { writer.flush(); } }; diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java index fd4b094..8e3b5bc 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java @@ -95,14 +95,14 @@ private boolean failed = false; @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition)); state.open(ctx); } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { state.appendFrame(buffer); } @@ -116,6 +116,11 @@ state.close(); state.writeOut(writer, new VSizeFrame(ctx), failed); } + + @Override + protected void doFlush() throws HyracksDataException { + // No op + } }; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java index 6439279..e7d3d55 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java @@ -63,7 +63,7 @@ } @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0); accessor = new FrameTupleAccessor(recDesc); indexHelper.open(); @@ -77,7 +77,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); int tupleCount = accessor.getTupleCount(); @@ -116,7 +116,7 @@ } @Override - public void flush() throws HyracksDataException { + public void doFlush() throws HyracksDataException { writer.flush(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java index 4f01978..d0470f7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java @@ -70,7 +70,7 @@ } @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0); accessor = new FrameTupleAccessor(inputRecDesc); writeBuffer = new VSizeFrame(ctx); @@ -93,7 +93,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); int tupleCount = accessor.getTupleCount(); for (int i = 0; i < tupleCount; i++) { @@ -167,7 +167,7 @@ } @Override - public void flush() throws HyracksDataException { + protected void doFlush() throws HyracksDataException { writer.flush(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java index c089854..e8e8f81 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java @@ -107,7 +107,7 @@ protected abstract int getFieldCount(); @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { writer.open(); indexHelper.open(); index = indexHelper.getIndexInstance(); @@ -174,7 +174,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); int tupleCount = accessor.getTupleCount(); try { @@ -190,7 +190,7 @@ } @Override - public void flush() throws HyracksDataException { + protected void doFlush() throws HyracksDataException { appender.flush(writer); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java index b51d132..a862357 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java @@ -37,8 +37,8 @@ protected FrameTupleAppender appender; @Override - public void open() throws HyracksDataException { - super.open(); + protected void doOpen() throws HyracksDataException { + super.doOpen(); appender = new FrameTupleAppender(writeBuffer); } @@ -49,7 +49,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor; int nextFlushTupleIndex = 0; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java index 002457b..0f4e9bf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java @@ -65,7 +65,7 @@ } @Override - public void open() throws HyracksDataException { + protected void doOpen() throws HyracksDataException { writer.open(); accessor = new FrameTupleAccessor(inputRecDesc); builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount()); @@ -74,7 +74,7 @@ } @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); int tupleCount = accessor.getTupleCount(); @@ -166,7 +166,7 @@ } @Override - public void flush() throws HyracksDataException { + public void doFlush() throws HyracksDataException { appender.flush(writer); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1618 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>