Yingyi Bu has submitted this change and it was merged.

Change subject: [ASTERIXDB-1943][API][STO] Make rebalance idempotent.
......................................................................


[ASTERIXDB-1943][API][STO] Make rebalance idempotent.

- user model changes:
  added rebalance cancellation HTTP API.

- storage format changes: no

- interface changes: no

Details:
- add a HTTP API for cancelling a rebalance request;
- clean up leftover states at the beginning of a
  rebalance request;
- add tests for rebalance cancellation.

Change-Id: I0d14a07978e106cd497cc35538fafef318b2fcf7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1821
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamou...@gmail.com>
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
18 files changed, 464 insertions(+), 79 deletions(-)

Approvals:
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Verified; No violations found; Verified

Objections:
  Jenkins: Violations found



diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index 53d8f6b..3bd1be5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -21,12 +21,18 @@
 import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.PrintWriter;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -61,20 +67,39 @@
     private static final String METADATA = "Metadata";
     private final ICcApplicationContext appCtx;
 
+    // One-at-a-time thread executor, for rebalance tasks.
+    private final ExecutorService executor = 
Executors.newSingleThreadExecutor();
+
+    // A queue that maintains submitted rebalance requests.
+    private final Queue<Future> rebalanceTasks = new ArrayDeque<>();
+
+    // A queue that tracks the termination of rebalance threads.
+    private final Queue<CountDownLatch> rebalanceFutureTerminated = new 
ArrayDeque<>();
+
     public RebalanceApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, ICcApplicationContext appCtx) {
         super(ctx, paths);
         this.appCtx = appCtx;
     }
 
     @Override
-    protected void post(IServletRequest request, IServletResponse response) {
-        PrintWriter out = response.writer();
-        ObjectMapper om = new ObjectMapper();
-        ObjectNode jsonResponse = om.createObjectNode();
+    protected void delete(IServletRequest request, IServletResponse response) {
         try {
             // Sets the content type.
             HttpUtil.setContentType(response, 
HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
+            // Cancels all rebalance requests.
+            cancelRebalance();
+            // Sends the response back.
+            sendResponse(response, HttpResponseStatus.OK, "rebalance tasks are 
cancelled");
+        } catch (Exception e) {
+            // Sends back and logs internal error if any exception happens 
during cancellation.
+            sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, 
e.getMessage(), e);
+        }
 
+    }
+
+    @Override
+    protected void post(IServletRequest request, IServletResponse response) {
+        try {
             // Gets dataverse, dataset, and target nodes for rebalance.
             String dataverseName = request.getParameter("dataverseName");
             String datasetName = request.getParameter("datasetName");
@@ -82,31 +107,66 @@
 
             // Parses and check target nodes.
             if (nodes == null) {
-                sendResponse(out, jsonResponse, response, 
HttpResponseStatus.BAD_REQUEST,
-                        "nodes are not given");
+                sendResponse(response, HttpResponseStatus.BAD_REQUEST, "nodes 
are not given");
                 return;
             }
             String nodesString = StringUtils.strip(nodes, "\"'").trim();
             String[] targetNodes = nodesString.split(",");
             if ("".equals(nodesString)) {
-                sendResponse(out, jsonResponse, response, 
HttpResponseStatus.BAD_REQUEST,
-                        "target nodes should not be empty");
+                sendResponse(response, HttpResponseStatus.BAD_REQUEST, "target 
nodes should not be empty");
                 return;
             }
 
             // If a user gives parameter datasetName, she should give 
dataverseName as well.
             if (dataverseName == null && datasetName != null) {
-                sendResponse(out, jsonResponse, response, 
HttpResponseStatus.BAD_REQUEST,
+                sendResponse(response, HttpResponseStatus.BAD_REQUEST,
                         "to rebalance a particular dataset, the parameter 
dataverseName must be given");
                 return;
             }
 
             // Does not allow rebalancing a metadata dataset.
             if (METADATA.equals(dataverseName)) {
-                sendResponse(out, jsonResponse, response, 
HttpResponseStatus.BAD_REQUEST,
-                        "cannot rebalance a metadata dataset");
+                sendResponse(response, HttpResponseStatus.BAD_REQUEST, "cannot 
rebalance a metadata dataset");
                 return;
             }
+            // Schedules a rebalance task and wait for its completion.
+            CountDownLatch terminated = scheduleRebalance(dataverseName, 
datasetName, targetNodes, response);
+            terminated.await();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, 
"the rebalance service is interrupted", e);
+        }
+    }
+
+    // Cancels all rebalance tasks.
+    private synchronized void cancelRebalance() throws InterruptedException {
+        for (Future rebalanceTask : rebalanceTasks) {
+            rebalanceTask.cancel(true);
+        }
+    }
+
+    // Removes a terminated task and its termination latch -- the heads.
+    private synchronized void removeTermintedTask() {
+        rebalanceTasks.remove();
+        rebalanceFutureTerminated.remove();
+    }
+
+    // Schedules a rebalance task.
+    private synchronized CountDownLatch scheduleRebalance(String 
dataverseName, String datasetName,
+            String[] targetNodes, IServletResponse response) {
+        CountDownLatch terminated = new CountDownLatch(1);
+        Future task = executor.submit(() -> doRebalance(dataverseName, 
datasetName, targetNodes, response, terminated));
+        rebalanceTasks.add(task);
+        rebalanceFutureTerminated.add(terminated);
+        return terminated;
+    }
+
+    // Performs the actual rebalance.
+    private void doRebalance(String dataverseName, String datasetName, 
String[] targetNodes, IServletResponse response,
+            CountDownLatch terminated) {
+        try {
+            // Sets the content type.
+            HttpUtil.setContentType(response, 
HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8);
 
             if (datasetName == null) {
                 // Rebalances datasets in a given dataverse or all 
non-metadata datasets.
@@ -123,10 +183,19 @@
             }
 
             // Sends response.
-            sendResponse(out, jsonResponse, response, HttpResponseStatus.OK, 
"successful");
+            sendResponse(response, HttpResponseStatus.OK, "successful");
+        } catch (InterruptedException e) {
+            sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                    "the rebalance task is cancelled by a user", e);
         } catch (Exception e) {
-            sendResponse(out, jsonResponse, response, 
HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
-            LOGGER.log(Level.WARNING, e.getMessage(), e);
+            sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, 
e.toString(), e);
+        } finally {
+            // Removes the heads of the task queue and the latch queue.
+            // Since the ExecutorService is one-at-a-time, the execution order 
of rebalance tasks is
+            // the same as the request submission order.
+            removeTermintedTask();
+            // Notify that the rebalance task is terminated.
+            terminated.countDown();
         }
     }
 
@@ -177,10 +246,24 @@
     }
 
     // Sends HTTP response to the request client.
-    private void sendResponse(PrintWriter out, ObjectNode jsonResponse, 
IServletResponse response,
-            HttpResponseStatus status, String message) {
+    private void sendResponse(IServletResponse response, HttpResponseStatus 
status, String message, Exception e) {
+        if (status != HttpResponseStatus.OK) {
+            if (e != null) {
+                LOGGER.log(Level.WARNING, message, e);
+            } else {
+                LOGGER.log(Level.WARNING, message);
+            }
+        }
+        PrintWriter out = response.writer();
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode jsonResponse = om.createObjectNode();
         jsonResponse.put("results", message);
         response.setStatus(status);
         out.write(jsonResponse.toString());
     }
+
+    // Sends HTTP response to the request client.
+    private void sendResponse(IServletResponse response, HttpResponseStatus 
status, String message) {
+        sendResponse(response, status, message, null);
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 691be50..275b055 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -65,6 +65,7 @@
 import 
org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -691,7 +692,15 @@
             } else if (logRecord.getNewOp() == 
AbstractIndexModificationOperationCallback.UPSERT_BYTE) {
                 // undo, upsert the old value if found, otherwise, physical 
delete
                 if (logRecord.getOldValue() == null) {
-                    indexAccessor.forcePhysicalDelete(logRecord.getNewValue());
+                    try {
+                        
indexAccessor.forcePhysicalDelete(logRecord.getNewValue());
+                    } catch (HyracksDataException hde) {
+                        // Since we're undoing according the write-ahead log, 
the actual upserting tuple
+                        // might not have been written to memory yet.
+                        if (hde.getErrorCode() != 
ErrorCode.UPDATE_OR_DELETE_NON_EXISTENT_KEY) {
+                            throw hde;
+                        }
+                    }
                 } else {
                     indexAccessor.forceUpsert(logRecord.getOldValue());
                 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index dae88c2..3b17a94 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -24,6 +24,8 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import java.util.stream.IntStream;
 
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -61,6 +63,7 @@
  * A utility class for the rebalance operation.
  */
 public class RebalanceUtil {
+    private static final Logger LOGGER = 
Logger.getLogger(RebalanceUtil.class.getName());
 
     private RebalanceUtil() {
 
@@ -83,12 +86,13 @@
      */
     public static void rebalance(String dataverseName, String datasetName, 
Set<String> targetNcNames,
             MetadataProvider metadataProvider, IHyracksClientConnection hcc) 
throws Exception {
-        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
         Dataset sourceDataset;
         Dataset targetDataset;
+        // Executes the first Metadata transaction.
         // Generates the rebalance target files. While doing that, hold read 
locks on the dataset so
         // that no one can drop the rebalance source dataset.
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
             // The source dataset.
             sourceDataset = metadataProvider.findDataset(dataverseName, 
datasetName);
@@ -125,13 +129,57 @@
             metadataProvider.getLocks().reset();
         }
 
-        // Starts another transaction for switching the metadata entity.
-        mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        // Up to this point, since the bulk part of a rebalance operation is 
done,
+        // the following two operations will retry after interrupt and finally 
rethrow InterruptedException,
+        // which means that they will always succeed and could possibly throw 
InterruptedException as the last step.
+        // TODO(yingyi): ASTERIXDB-1948, in case a crash happens, currently 
the system will either:
+        // 1. (crash before metadata switch) think the rebalance is not done, 
and the target data files are leaked until
+        // the next rebalance request.
+        // 2. (crash after metadata switch) think the rebalance is done, and 
the source data files are leaked;
+        runWithRetryAfterInterrupt(() -> {
+            // Executes the 2nd Metadata transaction for switching the 
metadata entity.
+            // It detaches the source dataset and attaches the target dataset 
to metadata's point of view.
+            runMetadataTransaction(metadataProvider,
+                () -> rebalanceSwitch(sourceDataset, targetDataset, 
metadataProvider, hcc));
+            // Executes the 3rd Metadata transaction to drop the source 
dataset files and the node group for
+            // the source dataset.
+            runMetadataTransaction(metadataProvider, () -> 
dropSourceDataset(sourceDataset, metadataProvider, hcc));
+        });
+    }
+
+    @FunctionalInterface
+    private interface Work {
+        void run() throws Exception;
+    }
+
+    // Runs works.run() and lets it sustain interrupts.
+    private static void runWithRetryAfterInterrupt(Work work) throws Exception 
{
+        int retryCount = 0;
+        InterruptedException interruptedException = null;
+        boolean done = false;
+        do {
+            try {
+                work.run();
+                done = true;
+            } catch (InterruptedException e) {
+                LOGGER.log(Level.WARNING, "Retry with attempt " + 
(++retryCount), e);
+                interruptedException = e;
+            }
+        } while (!done);
+
+        // Rethrows the interrupted exception.
+        if (interruptedException != null) {
+            throw interruptedException;
+        }
+    }
+
+    // Executes a metadata transaction.
+    private static void runMetadataTransaction(MetadataProvider 
metadataProvider, Work work) throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         try {
-            // Atomically switches the rebalance target to become the source 
dataset.
-            rebalanceSwitch(sourceDataset, targetDataset, metadataProvider, 
hcc);
-
+            // Performs the actual work.
+            work.run();
             // Complete the metadata transaction.
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
@@ -145,6 +193,9 @@
     // Rebalances from the source to the target.
     private static void rebalance(Dataset source, Dataset target, 
MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
+        // Drops the target dataset files (if any) to make rebalance 
idempotent.
+        dropDatasetFiles(target, metadataProvider, hcc);
+
         // Creates the rebalance target.
         createRebalanceTarget(target, metadataProvider, hcc);
 
@@ -155,6 +206,7 @@
         createAndLoadSecondaryIndexesForTarget(source, target, 
metadataProvider, hcc);
     }
 
+    // Switches the metadata entity from the source dataset to the target 
dataset.
     private static void rebalanceSwitch(Dataset source, Dataset target, 
MetadataProvider metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
         MetadataTransactionContext mdTxnCtx = 
metadataProvider.getMetadataTxnContext();
@@ -164,6 +216,7 @@
 
         Dataset sourceDataset = MetadataManagerUtil.findDataset(mdTxnCtx, 
source.getDataverseName(),
                 source.getDatasetName());
+
         if (sourceDataset == null) {
             // The dataset has already been dropped.
             // In this case, we should drop the generated target dataset files.
@@ -171,17 +224,23 @@
             return;
         }
 
-        // Drops the source dataset files.
-        dropDatasetFiles(source, metadataProvider, hcc);
-
         // Updates the dataset entry in the metadata storage
         MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target);
+    }
+
+    // Drops the source dataset.
+    private static void dropSourceDataset(Dataset source, MetadataProvider 
metadataProvider,
+            IHyracksClientConnection hcc) throws Exception {
+        // Drops the source dataset files. No need to lock the dataset entity 
here because the source dataset has
+        // been detached at this point.
+        dropDatasetFiles(source, metadataProvider, hcc);
 
         // Drops the metadata entry of source dataset's node group.
         String sourceNodeGroup = source.getNodeGroupName();
         
MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(),
 sourceNodeGroup);
-        MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, sourceNodeGroup, 
true);
+        
MetadataManager.INSTANCE.dropNodegroup(metadataProvider.getMetadataTxnContext(),
 sourceNodeGroup, true);
     }
+
 
     // Creates the files for the rebalance target dataset.
     private static void createRebalanceTarget(Dataset target, MetadataProvider 
metadataProvider,
@@ -254,12 +313,13 @@
                 new RecordDescriptor[] { 
target.getPrimaryRecordDescriptor(metadataProvider) });
     }
 
+    // Drops dataset files of a given dataset.
     private static void dropDatasetFiles(Dataset dataset, MetadataProvider 
metadataProvider,
             IHyracksClientConnection hcc) throws Exception {
         List<JobSpecification> jobs = new ArrayList<>();
         List<Index> indexes = 
metadataProvider.getDatasetIndexes(dataset.getDataverseName(), 
dataset.getDatasetName());
         for (Index index : indexes) {
-            jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, 
dataset));
+            jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, 
dataset, true));
         }
         for (JobSpecification jobSpec : jobs) {
             JobUtils.runJob(hcc, jobSpec, true);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
index 50c2986..b4f9ded 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.Future;
 import java.util.function.Predicate;
 
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.utils.Servlets;
 import org.apache.asterix.test.runtime.SqlppExecutionWithCancellationTest;
 import org.apache.asterix.testframework.context.TestCaseContext;
@@ -104,7 +105,7 @@
                 return false;
             }
         }
-        String errorMsg = getErrorMessage(e);
+        String errorMsg = ExceptionUtils.getErrorMessage(e);
         // Expected, "HYR0025" means a user cancelled the query.)
         if (errorMsg.startsWith("HYR0025")) {
             SqlppExecutionWithCancellationTest.numCancelledQueries++;
@@ -114,22 +115,5 @@
             System.err.println("Expected to find one of the following in error 
text:\n+++++\n" + expectedErrors + "\n+++++");
             return true;
         }
-    }
-
-    public static String getErrorMessage(Throwable th) {
-        Throwable cause = getRootCause(th);
-        return cause.getMessage();
-    }
-
-    // Finds the root cause of Throwable.
-    private static Throwable getRootCause(Throwable e) {
-        Throwable current = e;
-        Throwable cause = e.getCause();
-        while (cause != null && cause != current) {
-            Throwable nextCause = current.getCause();
-            current = cause;
-            cause = nextCause;
-        }
-        return current;
     }
 }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java
new file mode 100644
index 0000000..a63cb76
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.common;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.logging.Level;
+
+import org.apache.asterix.common.exceptions.ExceptionUtils;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.ComparisonEnum;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.junit.Assert;
+
+public class RebalanceCancellationTestExecutor extends TestExecutor {
+
+    private final ExecutorService executor = 
Executors.newSingleThreadExecutor();
+    private long waitTime = 100;
+
+    public void setWaitTime(long waitTime) {
+        this.waitTime = waitTime;
+    }
+
+    @Override
+    protected void executeHttpRequest(TestCaseContext.OutputFormat fmt, String 
statement,
+            Map<String, Object> variableCtx, String reqType, File testFile, 
File expectedResultFile,
+            File actualResultFile, MutableInt queryCount, int numResultFiles, 
String extension, ComparisonEnum compare)
+            throws Exception {
+        // Executes regular tests as usual.
+        if (!(testFile.getAbsolutePath().endsWith("post.http") && 
statement.contains("rebalance"))) {
+            super.executeHttpRequest(fmt, statement, variableCtx, reqType, 
testFile, expectedResultFile,
+                    actualResultFile, queryCount, numResultFiles, extension, 
compare);
+            return;
+        }
+
+        // Executes rebalance tests with cancellation.
+        Future<Exception> future = executor.submit(() -> {
+            //boolean failed = false;
+            try {
+                super.executeHttpRequest(fmt, statement, variableCtx, reqType, 
testFile, expectedResultFile,
+                        actualResultFile, queryCount, numResultFiles, 
extension, compare);
+            } catch (Exception e) {
+                // Since Hyracks job cancellation is not synchronous, 
re-executing rebalance could
+                // fail, but we keep retrying until it completes.
+                boolean done = false;
+                do {
+                    try {
+                        // Re-executes rebalance.
+                        super.executeHttpRequest(fmt, statement, variableCtx, 
reqType, testFile, expectedResultFile,
+                                actualResultFile, queryCount, numResultFiles, 
extension, compare);
+                        done = true;
+                    } catch (Exception e2) {
+                        String errorMsg = ExceptionUtils.getErrorMessage(e2);
+                        // not expected, but is a false alarm.
+                        if (errorMsg == null || !errorMsg.contains("reference 
count = 1")) {
+                            return e2;
+                        }
+                        LOGGER.log(Level.WARNING, e2.toString(), e2);
+                    }
+                } while (!done);
+            }
+            return null;
+        });
+        Thread.sleep(waitTime);
+        // Cancels the query request while the query is executing.
+        int rc = cancelQuery(getEndpoint(Servlets.REBALANCE), 
Collections.emptyList());
+        Assert.assertTrue(rc == 200 || rc == 404);
+        Exception e = future.get();
+        if (e != null) {
+            throw e;
+        }
+    }
+
+    // Cancels a submitted query through the cancellation REST API.
+    private int cancelQuery(URI uri, List<TestCase.CompilationUnit.Parameter> 
params) throws Exception {
+        HttpUriRequest method = constructDeleteMethodUrl(uri, params);
+        HttpResponse response = executeHttpRequest(method);
+        return response.getStatusLine().getStatusCode();
+    }
+
+    // Constructs a HTTP DELETE request.
+    private HttpUriRequest constructDeleteMethodUrl(URI uri, 
List<TestCase.CompilationUnit.Parameter> otherParams) {
+        RequestBuilder builder = RequestBuilder.delete(uri);
+        for (TestCase.CompilationUnit.Parameter param : otherParams) {
+            builder.addParameter(param.getName(), param.getValue());
+        }
+        builder.setCharset(StandardCharsets.UTF_8);
+        return builder.build();
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index ed6a77a..8791756 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1058,9 +1058,9 @@
         }
     }
 
-    private void executeHttpRequest(OutputFormat fmt, String statement, 
Map<String, Object> variableCtx, String reqType,
-            File testFile, File expectedResultFile, File actualResultFile, 
MutableInt queryCount, int numResultFiles,
-            String extension, ComparisonEnum compare) throws Exception {
+    protected void executeHttpRequest(OutputFormat fmt, String statement, 
Map<String, Object> variableCtx,
+            String reqType, File testFile, File expectedResultFile, File 
actualResultFile, MutableInt queryCount,
+            int numResultFiles, String extension, ComparisonEnum compare) 
throws Exception {
         String handleVar = getHandleVariable(statement);
         final String trimmedPathAndQuery = 
stripLineComments(stripJavaComments(statement)).trim();
         final String variablesReplaced = replaceVarRef(trimmedPathAndQuery, 
variableCtx);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java
new file mode 100644
index 0000000..1d7bdc3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.RebalanceCancellationTestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs rebalance tests with cancellation.
+ */
+@RunWith(Parameterized.class)
+public class RebalanceWithCancellationIT {
+    protected static final String TEST_CONFIG_FILE_NAME = 
"asterix-build-configuration.xml";
+    private static RebalanceCancellationTestExecutor executor = new 
RebalanceCancellationTestExecutor();
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, executor);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "RebalanceWithCancellationIT {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests("only_sqlpp.xml", "rebalance.xml");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public RebalanceWithCancellationIT(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        // Runs each single cancellation test multiple times and tests 
cancellation at various points of time.
+        for (int waitTime = 100; waitTime <= 1000; waitTime += 50) {
+            executor.setWaitTime(waitTime);
+            LangExecutionUtil.test(tcCtx);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
index cce069c..fff0775 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java
@@ -20,6 +20,7 @@
 
 import java.util.Collection;
 
+import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.test.common.CancellationTestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
@@ -70,7 +71,7 @@
         try {
             LangExecutionUtil.test(tcCtx);
         } catch (Exception e) {
-            String errorMsg = CancellationTestExecutor.getErrorMessage(e);
+            String errorMsg = ExceptionUtils.getErrorMessage(e);
             if (!errorMsg.contains("reference count = 1") // not expected, but 
is a false alarm.
                     && !errorMsg.contains("pinned and file is being closed") 
// not expected, but maybe a false alarm.
             ) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 51a535a..3da58e9 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -263,19 +264,30 @@
 
     @Override
     public synchronized void close(String resourcePath) throws 
HyracksDataException {
-        validateDatasetLifecycleManagerState();
-        int did = getDIDfromResourcePath(resourcePath);
-        long resourceID = getResourceIDfromResourcePath(resourcePath);
-        DatasetResource dsr = datasets.get(did);
-        if (dsr == null) {
-            throw new HyracksDataException("No index found with resourceID " + 
resourceID);
+        DatasetResource dsr = null;
+        IndexInfo iInfo = null;
+        try {
+            validateDatasetLifecycleManagerState();
+            int did = getDIDfromResourcePath(resourcePath);
+            long resourceID = getResourceIDfromResourcePath(resourcePath);
+            dsr = datasets.get(did);
+            if (dsr == null) {
+                throw 
HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, 
resourceID);
+            }
+            iInfo = dsr.getIndexInfo(resourceID);
+            if (iInfo == null) {
+                throw 
HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, 
resourceID);
+            }
+        } finally {
+            // Regardless of what exception is thrown in the try-block (e.g., 
line 279),
+            // we have to un-touch the index and dataset.
+            if (iInfo != null) {
+                iInfo.untouch();
+            }
+            if (dsr != null) {
+                dsr.untouch();
+            }
         }
-        IndexInfo iInfo = dsr.getIndexInfo(resourceID);
-        if (iInfo == null) {
-            throw new HyracksDataException("No index found with resourceID " + 
resourceID);
-        }
-        iInfo.untouch();
-        dsr.untouch();
     }
 
     @Override
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index 649f1f5..3105b3f 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -20,7 +20,6 @@
 
 public class ExceptionUtils {
     public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
-    public static final String MISSING_PARAMETER = "Missing parameter.\n";
     public static final String PARAMETER_NAME = "Parameter name: ";
     public static final String EXPECTED_VALUE = "Expected value: ";
     public static final String PASSED_VALUE = "Passed value: ";
@@ -32,4 +31,22 @@
         return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + 
System.lineSeparator() + EXPECTED_VALUE
                 + expectedValue + System.lineSeparator() + PASSED_VALUE + 
passedValue;
     }
+
+    // Gets the error message for the root cause of a given Throwable instance.
+    public static String getErrorMessage(Throwable th) {
+        Throwable cause = getRootCause(th);
+        return cause.getMessage();
+    }
+
+    // Finds the root cause of a given Throwable instance.
+    public static Throwable getRootCause(Throwable e) {
+        Throwable current = e;
+        Throwable cause = e.getCause();
+        while (cause != null && cause != current) {
+            Throwable nextCause = current.getCause();
+            current = cause;
+            cause = nextCause;
+        }
+        return current;
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 9e55a97..411f866 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -103,7 +103,14 @@
             Dataset dataset) throws AlgebricksException {
         SecondaryIndexOperationsHelper secondaryIndexHelper = 
SecondaryIndexOperationsHelper
                 .createIndexOperationsHelper(dataset, index, metadataProvider, 
physicalOptimizationConfig);
-        return secondaryIndexHelper.buildDropJobSpec();
+        return secondaryIndexHelper.buildDropJobSpec(false);
+    }
+
+    public static JobSpecification buildDropIndexJobSpec(Index index, 
MetadataProvider metadataProvider,
+            Dataset dataset, boolean failSilently) throws AlgebricksException {
+        SecondaryIndexOperationsHelper secondaryIndexHelper = 
SecondaryIndexOperationsHelper
+                .createIndexOperationsHelper(dataset, index, metadataProvider, 
physicalOptimizationConfig);
+        return secondaryIndexHelper.buildDropJobSpec(failSilently);
     }
 
     public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset 
dataset, Index index,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index d11ba21..8c45c11 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -170,7 +170,7 @@
 
     public abstract JobSpecification buildCompactJobSpec() throws 
AlgebricksException;
 
-    public abstract JobSpecification buildDropJobSpec() throws 
AlgebricksException;
+    public abstract JobSpecification buildDropJobSpec(boolean failSilently) 
throws AlgebricksException;
 
     protected void init() throws AlgebricksException {
         payloadSerde = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 907192c..2dcab4f 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -66,14 +66,15 @@
     }
 
     @Override
-    public JobSpecification buildDropJobSpec() throws AlgebricksException {
+    public JobSpecification buildDropJobSpec(boolean failSilently) throws 
AlgebricksException {
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
                 metadataProvider.getSplitProviderAndConstraints(dataset, 
index.getIndexName());
         IIndexDataflowHelperFactory dataflowHelperFactory = new 
IndexDataflowHelperFactory(
                 
metadataProvider.getStorageComponentProvider().getStorageManager(), 
splitsAndConstraint.first);
         // The index drop operation should be persistent regardless of temp 
datasets or permanent dataset.
-        IndexDropOperatorDescriptor btreeDrop = new 
IndexDropOperatorDescriptor(spec, dataflowHelperFactory);
+        IndexDropOperatorDescriptor btreeDrop = new 
IndexDropOperatorDescriptor(spec, dataflowHelperFactory,
+                failSilently);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
btreeDrop,
                 splitsAndConstraint.second);
         spec.addRoot(btreeDrop);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index cb03ae4..e530bc3 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -213,13 +213,18 @@
     public synchronized void delete(String relativePath) throws 
HyracksDataException {
         FileReference resourceFile = getLocalResourceFileByName(ioManager, 
relativePath);
         if (resourceFile.getFile().exists()) {
-            resourceFile.delete();
-            resourceCache.invalidate(relativePath);
-
-            //if replication enabled, delete resource from remote replicas
-            if (isReplicationEnabled
-                    && 
!resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX))
 {
-                createReplicationJob(ReplicationOperation.DELETE, 
resourceFile);
+            try {
+                // Invalidate before deleting the file just in case file 
deletion throws some exception.
+                // Since it's just a cache invalidation, it should not affect 
correctness.
+                resourceCache.invalidate(relativePath);
+                resourceFile.delete();
+            } finally {
+                // Regardless of successfully deleted or not, the operation 
should be replicated.
+                //if replication enabled, delete resource from remote replicas
+                if (isReplicationEnabled
+                        && 
!resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX))
 {
+                    createReplicationJob(ReplicationOperation.DELETE, 
resourceFile);
+                }
             }
         } else {
             throw 
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 8a91547..2e6c8a3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -116,6 +116,7 @@
     public static final int CANNOT_CREATE_EXISTING_INDEX = 80;
     public static final int FILE_ALREADY_MAPPED = 81;
     public static final int FILE_ALREADY_EXISTS = 82;
+    public static final int NO_INDEX_FOUND_WITH_RESOURCE_ID = 83;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 9f983a7..6389ffa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -99,5 +99,6 @@
 80 = Cannot create index because it already exists
 81 = File %1$s is already mapped
 82 = Failed to create the file %1$s because it already exists
+83 = No index found with resourceID %1$s
 
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
index d162be0..18c7107 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
@@ -30,16 +30,23 @@
 
     private static final long serialVersionUID = 1L;
     private final IIndexDataflowHelperFactory dataflowHelperFactory;
+    private final boolean failSilently;
 
     public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
             IIndexDataflowHelperFactory dataflowHelperFactory) {
+        this(spec, dataflowHelperFactory, false);
+    }
+
+    public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            IIndexDataflowHelperFactory dataflowHelperFactory, boolean 
failSilently) {
         super(spec, 0, 0);
         this.dataflowHelperFactory = dataflowHelperFactory;
+        this.failSilently = failSilently;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
-        return new IndexDropOperatorNodePushable(dataflowHelperFactory, ctx, 
partition);
+        return new IndexDropOperatorNodePushable(dataflowHelperFactory, 
failSilently, ctx, partition);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index fce31ca..7c2021b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -28,10 +28,12 @@
 
 public class IndexDropOperatorNodePushable extends 
AbstractOperatorNodePushable {
     private final IIndexDataflowHelper indexHelper;
+    private final boolean failSliently;
 
-    public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory 
indexHelperFactory, IHyracksTaskContext ctx,
-            int partition) throws HyracksDataException {
+    public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory 
indexHelperFactory, boolean failSilently,
+            IHyracksTaskContext ctx, int partition) throws 
HyracksDataException {
         this.indexHelper = 
indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partition);
+        this.failSliently = failSilently;
     }
 
     @Override
@@ -50,7 +52,13 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        indexHelper.destroy();
+        try {
+            indexHelper.destroy();
+        } catch (HyracksDataException e) {
+            if (!failSliently) {
+                throw e;
+            }
+        }
     }
 
     @Override

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1821
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I0d14a07978e106cd497cc35538fafef318b2fcf7
Gerrit-PatchSet: 15
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <buyin...@gmail.com>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <buyin...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to