abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1618

Change subject: Ensure nextFrame and flush are not called in a failed pipeline
......................................................................

Ensure nextFrame and flush are not called in a failed pipeline

Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
22 files changed, 115 insertions(+), 60 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/18/1618/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index 2eca55d..296a3ed 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -72,7 +72,7 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         RecordDescriptor inputRecDesc = 
recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
@@ -102,7 +102,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         currentTupleIdx = 0;
         lastFlushedTupleIdx = 0;
         flushedPartialTuples = false;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 9982477..7b11cde 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -91,7 +91,7 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         synchronized (writer) {
             writer.open();
         }
@@ -135,7 +135,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer frame) throws HyracksDataException {
         try {
             total++;
             if (consumer.cause() != null) {
@@ -390,7 +390,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void doFlush() throws HyracksDataException {
         synchronized (writer) {
             writer.flush();
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
index 1bdc7e1..a8ca426 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/SyncFeedRuntimeInputHandler.java
@@ -35,12 +35,12 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         writer.open();
     }
 
     @Override
-    public void nextFrame(ByteBuffer frame) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer frame) throws HyracksDataException {
         while (frame != null) {
             try {
                 writer.nextFrame(frame);
@@ -65,7 +65,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    protected void doFlush() throws HyracksDataException {
         writer.flush();
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
index 69a2020..6e2b190 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
@@ -45,7 +45,7 @@
 
     // We override the open function to search a specific version of the index
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         writer.open();
         ExternalBTreeWithBuddyDataflowHelper dataFlowHelper = 
(ExternalBTreeWithBuddyDataflowHelper) indexHelper;
         accessor = new FrameTupleAccessor(inputRecDesc);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index c95a4a7..5e4db84 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -54,7 +54,7 @@
     // when creating the bulkLoader, it creates a transaction bulk loader
     // It uses the bulkLoader to insert delete tuples for the deleted files
     @Override
-    public void open() throws HyracksDataException {
+    public void doOpen() throws HyracksDataException {
         RecordDescriptor recDesc = 
recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(recDesc);
         indexHelper.open();
@@ -76,7 +76,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    public void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
         for (int i = 0; i < tupleCount; i++) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
index d4718a9..8b888e1 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalLookupOperatorDescriptor.java
@@ -78,7 +78,7 @@
             private boolean indexOpen = false;
 
             @Override
-            public void open() throws HyracksDataException {
+            protected void doOpen() throws HyracksDataException {
                 try {
                     adapter = adapterFactory.createAdapter(ctx, partition,
                             
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), 
snapshotAccessor, writer);
@@ -122,7 +122,7 @@
             }
 
             @Override
-            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+            protected void doNextFrame(ByteBuffer buffer) throws 
HyracksDataException {
                 try {
                     adapter.nextFrame(buffer);
                 } catch (Throwable th) {
@@ -131,7 +131,7 @@
             }
 
             @Override
-            public void flush() throws HyracksDataException {
+            protected void doFlush() throws HyracksDataException {
                 adapter.flush();
             }
         };
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
index 237e2c0..e3415cc 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
@@ -44,7 +44,7 @@
 
     // We override this method to specify the searched version of the index
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         writer.open();
         accessor = new FrameTupleAccessor(inputRecDesc);
         indexHelper.open();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index bdc11f5..84176cb 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -73,12 +73,12 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    public void doOpen() throws HyracksDataException {
         writer.open();
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    public void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         writer.nextFrame(buffer);
     }
 
@@ -88,7 +88,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void doFlush() throws HyracksDataException {
         writer.flush();
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index fbdbece..5fa03c3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -113,7 +113,7 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         ActiveRuntimeId runtimeId = new 
ActiveRuntimeId(connectionId.getFeedId(), runtimeType.toString(), partition);
         try {
             initializeNewFeedRuntime(runtimeId);
@@ -138,7 +138,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
             FeedUtils.processFeedMessage(buffer, message, fta);
             writer.nextFrame(buffer);
@@ -161,7 +161,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    protected void doFlush() throws HyracksDataException {
         writer.flush();
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 0bb27db..d3e5676 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -107,9 +107,9 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
-        ActiveRuntimeId runtimeId = new 
ActiveRuntimeId(connectionId.getFeedId(),
-                runtimeType.toString() + "." + targetId, partition);
+    protected void doOpen() throws HyracksDataException {
+        ActiveRuntimeId runtimeId =
+                new ActiveRuntimeId(connectionId.getFeedId(), 
runtimeType.toString() + "." + targetId, partition);
         try {
             initializeNewFeedRuntime(runtimeId);
             insertOperator.open();
@@ -123,8 +123,7 @@
         fta = new 
FrameTupleAccessor(recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(),
 0));
         insertOperator.setOutputFrameWriter(0, writer, recordDesc);
         if (insertOperator instanceof LSMInsertDeleteOperatorNodePushable) {
-            LSMInsertDeleteOperatorNodePushable indexOp =
-                    (LSMInsertDeleteOperatorNodePushable) insertOperator;
+            LSMInsertDeleteOperatorNodePushable indexOp = 
(LSMInsertDeleteOperatorNodePushable) insertOperator;
             if (!indexOp.isPrimary()) {
                 writer = insertOperator;
                 return;
@@ -139,7 +138,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
             FeedUtils.processFeedMessage(buffer, message, fta);
             writer.nextFrame(buffer);
@@ -160,7 +159,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    protected void doFlush() throws HyracksDataException {
         writer.flush();
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 1f18c97..b51db6f 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -125,7 +125,7 @@
     // have been obtained through searchForUpsert operation
 
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         RecordDescriptor inputRecDesc = 
recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
@@ -202,7 +202,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) 
indexAccessor;
         int tupleCount = accessor.getTupleCount();
@@ -333,7 +333,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void doFlush() throws HyracksDataException {
         // No op since nextFrame flushes by default
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index 0db5ff0..8b46026 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -65,8 +65,8 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
-        super.open();
+    protected void doOpen() throws HyracksDataException {
+        super.doOpen();
         abstractModCallback = (AbstractIndexModificationOperationCallback) 
modCallback;
     }
 
@@ -96,7 +96,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
index ec31eb7..5d27b07 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractUnaryInputUnaryOutputOperatorNodePushable.java
@@ -18,10 +18,19 @@
  */
 package org.apache.hyracks.dataflow.std.base;
 
+import java.nio.ByteBuffer;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class AbstractUnaryInputUnaryOutputOperatorNodePushable 
extends AbstractUnaryOutputOperatorNodePushable
         implements IFrameWriter {
+    private static final Logger LOGGER =
+            
Logger.getLogger(AbstractUnaryInputUnaryOutputOperatorNodePushable.class.getName());
+    private boolean failed = false;
+
     @Override
     public final IFrameWriter getInputFrameWriter(int index) {
         return this;
@@ -31,4 +40,46 @@
     public final int getInputArity() {
         return 1;
     }
+
+    @Override
+    public final void open() throws HyracksDataException {
+        failed = true;
+        doOpen();
+        failed = false;
+    }
+
+    protected abstract void doOpen() throws HyracksDataException;
+
+    @Override
+    public final void nextFrame(ByteBuffer buffer) throws HyracksDataException 
{
+        if (failed) {
+            LOGGER.log(Level.WARNING, "next Frame called on a failed writer");
+            return;
+        }
+        failed = true;
+        doNextFrame(buffer);
+        failed = false;
+    }
+
+    protected abstract void doNextFrame(ByteBuffer buffer) throws 
HyracksDataException;
+
+    @Override
+    public final void flush() throws HyracksDataException {
+        if (failed) {
+            LOGGER.log(Level.WARNING, "flush called on a failed writer");
+            return;
+        }
+        failed = true;
+        doFlush();
+        failed = false;
+    }
+
+    public final void unfail() {
+        if (!failed) {
+            LOGGER.log(Level.WARNING, "unfail called on a not failed writer");
+        }
+        failed = false;
+    }
+
+    protected abstract void doFlush() throws HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 2acc4db..06e51e3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -52,7 +52,7 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         final IBinaryComparator[] comparators = new 
IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -63,7 +63,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         pgw.nextFrame(buffer);
     }
 
@@ -78,7 +78,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    protected void doFlush() throws HyracksDataException {
         pgw.flush();
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
index a07540b..3ba2a20 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/IdentityOperatorDescriptor.java
@@ -43,12 +43,12 @@
                     throws HyracksDataException {
         return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
             @Override
-            public void open() throws HyracksDataException {
+            protected void doOpen() throws HyracksDataException {
                 writer.open();
             }
 
             @Override
-            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+            protected void doNextFrame(ByteBuffer buffer) throws 
HyracksDataException {
                 writer.nextFrame(buffer);
             }
 
@@ -63,7 +63,7 @@
             }
 
             @Override
-            public void flush() throws HyracksDataException {
+            protected void doFlush() throws HyracksDataException {
                 writer.flush();
             }
         };
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
index 084c9ab..396655d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/LimitOperatorDescriptor.java
@@ -54,7 +54,7 @@
             private boolean finished;
 
             @Override
-            public void open() throws HyracksDataException {
+            protected void doOpen() throws HyracksDataException {
                 fta = new FrameTupleAccessor(recordDescriptors[0]);
                 currentSize = 0;
                 finished = false;
@@ -62,7 +62,7 @@
             }
 
             @Override
-            public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+            protected void doNextFrame(ByteBuffer buffer) throws 
HyracksDataException {
                 if (!finished) {
                     fta.reset(buffer);
                     int count = fta.getTupleCount();
@@ -94,7 +94,7 @@
             }
 
             @Override
-            public void flush() throws HyracksDataException {
+            protected void doFlush() throws HyracksDataException {
                 writer.flush();
             }
         };
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index fd4b094..8e3b5bc 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -95,14 +95,14 @@
                 private boolean failed = false;
 
                 @Override
-                public void open() throws HyracksDataException {
+                protected void doOpen() throws HyracksDataException {
                     state = new 
MaterializerTaskState(ctx.getJobletContext().getJobId(),
                             new TaskId(getActivityId(), partition));
                     state.open(ctx);
                 }
 
                 @Override
-                public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                protected void doNextFrame(ByteBuffer buffer) throws 
HyracksDataException {
                     state.appendFrame(buffer);
                 }
 
@@ -116,6 +116,11 @@
                     state.close();
                     state.writeOut(writer, new VSizeFrame(ctx), failed);
                 }
+
+                @Override
+                protected void doFlush() throws HyracksDataException {
+                    // No op
+                }
             };
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 6439279..e7d3d55 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -63,7 +63,7 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         RecordDescriptor recDesc = 
recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(recDesc);
         indexHelper.open();
@@ -77,7 +77,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
 
@@ -116,7 +116,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void doFlush() throws HyracksDataException {
         writer.flush();
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 4f01978..d0470f7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -70,7 +70,7 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         RecordDescriptor inputRecDesc = 
recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
@@ -93,7 +93,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
         for (int i = 0; i < tupleCount; i++) {
@@ -167,7 +167,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    protected void doFlush() throws HyracksDataException {
         writer.flush();
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index c089854..e8e8f81 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -107,7 +107,7 @@
     protected abstract int getFieldCount();
 
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         writer.open();
         indexHelper.open();
         index = indexHelper.getIndexInstance();
@@ -174,7 +174,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
         try {
@@ -190,7 +190,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    protected void doFlush() throws HyracksDataException {
         appender.flush(writer);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index b51d132..a862357 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -37,8 +37,8 @@
     protected FrameTupleAppender appender;
 
     @Override
-    public void open() throws HyracksDataException {
-        super.open();
+    protected void doOpen() throws HyracksDataException {
+        super.doOpen();
         appender = new FrameTupleAppender(writeBuffer);
     }
 
@@ -49,7 +49,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
         int nextFlushTupleIndex = 0;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 002457b..0f4e9bf 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -65,7 +65,7 @@
     }
 
     @Override
-    public void open() throws HyracksDataException {
+    protected void doOpen() throws HyracksDataException {
         writer.open();
         accessor = new FrameTupleAccessor(inputRecDesc);
         builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
@@ -74,7 +74,7 @@
     }
 
     @Override
-    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+    protected void doNextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
 
@@ -166,7 +166,7 @@
     }
 
     @Override
-    public void flush() throws HyracksDataException {
+    public void doFlush() throws HyracksDataException {
         appender.flush(writer);
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1618
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I9827b06f640858f27ec1bcca2a39991780bee3b1
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to