Yingyi Bu has submitted this change and it was merged. Change subject: [ASTERIXDB-1943][API][STO] Make rebalance idempotent. ......................................................................
[ASTERIXDB-1943][API][STO] Make rebalance idempotent. - user model changes: added rebalance cancellation HTTP API. - storage format changes: no - interface changes: no Details: - add a HTTP API for cancelling a rebalance request; - clean up leftover states at the beginning of a rebalance request; - add tests for rebalance cancellation. Change-Id: I0d14a07978e106cd497cc35538fafef318b2fcf7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1821 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> BAD: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java 18 files changed, 464 insertions(+), 79 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java index 53d8f6b..3bd1be5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java @@ -21,12 +21,18 @@ import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.PrintWriter; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; +import java.util.Queue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; @@ -61,20 +67,39 @@ private static final String METADATA = "Metadata"; private final ICcApplicationContext appCtx; + // One-at-a-time thread executor, for rebalance tasks. + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + // A queue that maintains submitted rebalance requests. + private final Queue<Future> rebalanceTasks = new ArrayDeque<>(); + + // A queue that tracks the termination of rebalance threads. + private final Queue<CountDownLatch> rebalanceFutureTerminated = new ArrayDeque<>(); + public RebalanceApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx) { super(ctx, paths); this.appCtx = appCtx; } @Override - protected void post(IServletRequest request, IServletResponse response) { - PrintWriter out = response.writer(); - ObjectMapper om = new ObjectMapper(); - ObjectNode jsonResponse = om.createObjectNode(); + protected void delete(IServletRequest request, IServletResponse response) { try { // Sets the content type. HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8); + // Cancels all rebalance requests. + cancelRebalance(); + // Sends the response back. + sendResponse(response, HttpResponseStatus.OK, "rebalance tasks are cancelled"); + } catch (Exception e) { + // Sends back and logs internal error if any exception happens during cancellation. + sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage(), e); + } + } + + @Override + protected void post(IServletRequest request, IServletResponse response) { + try { // Gets dataverse, dataset, and target nodes for rebalance. String dataverseName = request.getParameter("dataverseName"); String datasetName = request.getParameter("datasetName"); @@ -82,31 +107,66 @@ // Parses and check target nodes. if (nodes == null) { - sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST, - "nodes are not given"); + sendResponse(response, HttpResponseStatus.BAD_REQUEST, "nodes are not given"); return; } String nodesString = StringUtils.strip(nodes, "\"'").trim(); String[] targetNodes = nodesString.split(","); if ("".equals(nodesString)) { - sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST, - "target nodes should not be empty"); + sendResponse(response, HttpResponseStatus.BAD_REQUEST, "target nodes should not be empty"); return; } // If a user gives parameter datasetName, she should give dataverseName as well. if (dataverseName == null && datasetName != null) { - sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST, + sendResponse(response, HttpResponseStatus.BAD_REQUEST, "to rebalance a particular dataset, the parameter dataverseName must be given"); return; } // Does not allow rebalancing a metadata dataset. if (METADATA.equals(dataverseName)) { - sendResponse(out, jsonResponse, response, HttpResponseStatus.BAD_REQUEST, - "cannot rebalance a metadata dataset"); + sendResponse(response, HttpResponseStatus.BAD_REQUEST, "cannot rebalance a metadata dataset"); return; } + // Schedules a rebalance task and wait for its completion. + CountDownLatch terminated = scheduleRebalance(dataverseName, datasetName, targetNodes, response); + terminated.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, "the rebalance service is interrupted", e); + } + } + + // Cancels all rebalance tasks. + private synchronized void cancelRebalance() throws InterruptedException { + for (Future rebalanceTask : rebalanceTasks) { + rebalanceTask.cancel(true); + } + } + + // Removes a terminated task and its termination latch -- the heads. + private synchronized void removeTermintedTask() { + rebalanceTasks.remove(); + rebalanceFutureTerminated.remove(); + } + + // Schedules a rebalance task. + private synchronized CountDownLatch scheduleRebalance(String dataverseName, String datasetName, + String[] targetNodes, IServletResponse response) { + CountDownLatch terminated = new CountDownLatch(1); + Future task = executor.submit(() -> doRebalance(dataverseName, datasetName, targetNodes, response, terminated)); + rebalanceTasks.add(task); + rebalanceFutureTerminated.add(terminated); + return terminated; + } + + // Performs the actual rebalance. + private void doRebalance(String dataverseName, String datasetName, String[] targetNodes, IServletResponse response, + CountDownLatch terminated) { + try { + // Sets the content type. + HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, HttpUtil.Encoding.UTF8); if (datasetName == null) { // Rebalances datasets in a given dataverse or all non-metadata datasets. @@ -123,10 +183,19 @@ } // Sends response. - sendResponse(out, jsonResponse, response, HttpResponseStatus.OK, "successful"); + sendResponse(response, HttpResponseStatus.OK, "successful"); + } catch (InterruptedException e) { + sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, + "the rebalance task is cancelled by a user", e); } catch (Exception e) { - sendResponse(out, jsonResponse, response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); - LOGGER.log(Level.WARNING, e.getMessage(), e); + sendResponse(response, HttpResponseStatus.INTERNAL_SERVER_ERROR, e.toString(), e); + } finally { + // Removes the heads of the task queue and the latch queue. + // Since the ExecutorService is one-at-a-time, the execution order of rebalance tasks is + // the same as the request submission order. + removeTermintedTask(); + // Notify that the rebalance task is terminated. + terminated.countDown(); } } @@ -177,10 +246,24 @@ } // Sends HTTP response to the request client. - private void sendResponse(PrintWriter out, ObjectNode jsonResponse, IServletResponse response, - HttpResponseStatus status, String message) { + private void sendResponse(IServletResponse response, HttpResponseStatus status, String message, Exception e) { + if (status != HttpResponseStatus.OK) { + if (e != null) { + LOGGER.log(Level.WARNING, message, e); + } else { + LOGGER.log(Level.WARNING, message); + } + } + PrintWriter out = response.writer(); + ObjectMapper om = new ObjectMapper(); + ObjectNode jsonResponse = om.createObjectNode(); jsonResponse.put("results", message); response.setStatus(status); out.write(jsonResponse.toString()); } + + // Sends HTTP response to the request client. + private void sendResponse(IServletResponse response, HttpResponseStatus status, String message) { + sendResponse(response, status, message, null); + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 691be50..275b055 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -65,6 +65,7 @@ import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; @@ -691,7 +692,15 @@ } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) { // undo, upsert the old value if found, otherwise, physical delete if (logRecord.getOldValue() == null) { - indexAccessor.forcePhysicalDelete(logRecord.getNewValue()); + try { + indexAccessor.forcePhysicalDelete(logRecord.getNewValue()); + } catch (HyracksDataException hde) { + // Since we're undoing according the write-ahead log, the actual upserting tuple + // might not have been written to memory yet. + if (hde.getErrorCode() != ErrorCode.UPDATE_OR_DELETE_NON_EXISTENT_KEY) { + throw hde; + } + } } else { indexAccessor.forceUpsert(logRecord.getOldValue()); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index dae88c2..3b17a94 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -24,6 +24,8 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.IntStream; import org.apache.asterix.common.exceptions.AsterixException; @@ -61,6 +63,7 @@ * A utility class for the rebalance operation. */ public class RebalanceUtil { + private static final Logger LOGGER = Logger.getLogger(RebalanceUtil.class.getName()); private RebalanceUtil() { @@ -83,12 +86,13 @@ */ public static void rebalance(String dataverseName, String datasetName, Set<String> targetNcNames, MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception { - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); Dataset sourceDataset; Dataset targetDataset; + // Executes the first Metadata transaction. // Generates the rebalance target files. While doing that, hold read locks on the dataset so // that no one can drop the rebalance source dataset. + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); try { // The source dataset. sourceDataset = metadataProvider.findDataset(dataverseName, datasetName); @@ -125,13 +129,57 @@ metadataProvider.getLocks().reset(); } - // Starts another transaction for switching the metadata entity. - mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + // Up to this point, since the bulk part of a rebalance operation is done, + // the following two operations will retry after interrupt and finally rethrow InterruptedException, + // which means that they will always succeed and could possibly throw InterruptedException as the last step. + // TODO(yingyi): ASTERIXDB-1948, in case a crash happens, currently the system will either: + // 1. (crash before metadata switch) think the rebalance is not done, and the target data files are leaked until + // the next rebalance request. + // 2. (crash after metadata switch) think the rebalance is done, and the source data files are leaked; + runWithRetryAfterInterrupt(() -> { + // Executes the 2nd Metadata transaction for switching the metadata entity. + // It detaches the source dataset and attaches the target dataset to metadata's point of view. + runMetadataTransaction(metadataProvider, + () -> rebalanceSwitch(sourceDataset, targetDataset, metadataProvider, hcc)); + // Executes the 3rd Metadata transaction to drop the source dataset files and the node group for + // the source dataset. + runMetadataTransaction(metadataProvider, () -> dropSourceDataset(sourceDataset, metadataProvider, hcc)); + }); + } + + @FunctionalInterface + private interface Work { + void run() throws Exception; + } + + // Runs works.run() and lets it sustain interrupts. + private static void runWithRetryAfterInterrupt(Work work) throws Exception { + int retryCount = 0; + InterruptedException interruptedException = null; + boolean done = false; + do { + try { + work.run(); + done = true; + } catch (InterruptedException e) { + LOGGER.log(Level.WARNING, "Retry with attempt " + (++retryCount), e); + interruptedException = e; + } + } while (!done); + + // Rethrows the interrupted exception. + if (interruptedException != null) { + throw interruptedException; + } + } + + // Executes a metadata transaction. + private static void runMetadataTransaction(MetadataProvider metadataProvider, Work work) throws Exception { + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); try { - // Atomically switches the rebalance target to become the source dataset. - rebalanceSwitch(sourceDataset, targetDataset, metadataProvider, hcc); - + // Performs the actual work. + work.run(); // Complete the metadata transaction. MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); } catch (Exception e) { @@ -145,6 +193,9 @@ // Rebalances from the source to the target. private static void rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception { + // Drops the target dataset files (if any) to make rebalance idempotent. + dropDatasetFiles(target, metadataProvider, hcc); + // Creates the rebalance target. createRebalanceTarget(target, metadataProvider, hcc); @@ -155,6 +206,7 @@ createAndLoadSecondaryIndexesForTarget(source, target, metadataProvider, hcc); } + // Switches the metadata entity from the source dataset to the target dataset. private static void rebalanceSwitch(Dataset source, Dataset target, MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception { MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); @@ -164,6 +216,7 @@ Dataset sourceDataset = MetadataManagerUtil.findDataset(mdTxnCtx, source.getDataverseName(), source.getDatasetName()); + if (sourceDataset == null) { // The dataset has already been dropped. // In this case, we should drop the generated target dataset files. @@ -171,17 +224,23 @@ return; } - // Drops the source dataset files. - dropDatasetFiles(source, metadataProvider, hcc); - // Updates the dataset entry in the metadata storage MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target); + } + + // Drops the source dataset. + private static void dropSourceDataset(Dataset source, MetadataProvider metadataProvider, + IHyracksClientConnection hcc) throws Exception { + // Drops the source dataset files. No need to lock the dataset entity here because the source dataset has + // been detached at this point. + dropDatasetFiles(source, metadataProvider, hcc); // Drops the metadata entry of source dataset's node group. String sourceNodeGroup = source.getNodeGroupName(); MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(), sourceNodeGroup); - MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, sourceNodeGroup, true); + MetadataManager.INSTANCE.dropNodegroup(metadataProvider.getMetadataTxnContext(), sourceNodeGroup, true); } + // Creates the files for the rebalance target dataset. private static void createRebalanceTarget(Dataset target, MetadataProvider metadataProvider, @@ -254,12 +313,13 @@ new RecordDescriptor[] { target.getPrimaryRecordDescriptor(metadataProvider) }); } + // Drops dataset files of a given dataset. private static void dropDatasetFiles(Dataset dataset, MetadataProvider metadataProvider, IHyracksClientConnection hcc) throws Exception { List<JobSpecification> jobs = new ArrayList<>(); List<Index> indexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()); for (Index index : indexes) { - jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset)); + jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, true)); } for (JobSpecification jobSpec : jobs) { JobUtils.runJob(hcc, jobSpec, true); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java index 50c2986..b4f9ded 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/CancellationTestExecutor.java @@ -31,6 +31,7 @@ import java.util.concurrent.Future; import java.util.function.Predicate; +import org.apache.asterix.common.exceptions.ExceptionUtils; import org.apache.asterix.common.utils.Servlets; import org.apache.asterix.test.runtime.SqlppExecutionWithCancellationTest; import org.apache.asterix.testframework.context.TestCaseContext; @@ -104,7 +105,7 @@ return false; } } - String errorMsg = getErrorMessage(e); + String errorMsg = ExceptionUtils.getErrorMessage(e); // Expected, "HYR0025" means a user cancelled the query.) if (errorMsg.startsWith("HYR0025")) { SqlppExecutionWithCancellationTest.numCancelledQueries++; @@ -114,22 +115,5 @@ System.err.println("Expected to find one of the following in error text:\n+++++\n" + expectedErrors + "\n+++++"); return true; } - } - - public static String getErrorMessage(Throwable th) { - Throwable cause = getRootCause(th); - return cause.getMessage(); - } - - // Finds the root cause of Throwable. - private static Throwable getRootCause(Throwable e) { - Throwable current = e; - Throwable cause = e.getCause(); - while (cause != null && cause != current) { - Throwable nextCause = current.getCause(); - current = cause; - cause = nextCause; - } - return current; } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java new file mode 100644 index 0000000..a63cb76 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/RebalanceCancellationTestExecutor.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.common; + +import java.io.File; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.logging.Level; + +import org.apache.asterix.common.exceptions.ExceptionUtils; +import org.apache.asterix.common.utils.Servlets; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.asterix.testframework.xml.ComparisonEnum; +import org.apache.asterix.testframework.xml.TestCase; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.methods.RequestBuilder; +import org.junit.Assert; + +public class RebalanceCancellationTestExecutor extends TestExecutor { + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private long waitTime = 100; + + public void setWaitTime(long waitTime) { + this.waitTime = waitTime; + } + + @Override + protected void executeHttpRequest(TestCaseContext.OutputFormat fmt, String statement, + Map<String, Object> variableCtx, String reqType, File testFile, File expectedResultFile, + File actualResultFile, MutableInt queryCount, int numResultFiles, String extension, ComparisonEnum compare) + throws Exception { + // Executes regular tests as usual. + if (!(testFile.getAbsolutePath().endsWith("post.http") && statement.contains("rebalance"))) { + super.executeHttpRequest(fmt, statement, variableCtx, reqType, testFile, expectedResultFile, + actualResultFile, queryCount, numResultFiles, extension, compare); + return; + } + + // Executes rebalance tests with cancellation. + Future<Exception> future = executor.submit(() -> { + //boolean failed = false; + try { + super.executeHttpRequest(fmt, statement, variableCtx, reqType, testFile, expectedResultFile, + actualResultFile, queryCount, numResultFiles, extension, compare); + } catch (Exception e) { + // Since Hyracks job cancellation is not synchronous, re-executing rebalance could + // fail, but we keep retrying until it completes. + boolean done = false; + do { + try { + // Re-executes rebalance. + super.executeHttpRequest(fmt, statement, variableCtx, reqType, testFile, expectedResultFile, + actualResultFile, queryCount, numResultFiles, extension, compare); + done = true; + } catch (Exception e2) { + String errorMsg = ExceptionUtils.getErrorMessage(e2); + // not expected, but is a false alarm. + if (errorMsg == null || !errorMsg.contains("reference count = 1")) { + return e2; + } + LOGGER.log(Level.WARNING, e2.toString(), e2); + } + } while (!done); + } + return null; + }); + Thread.sleep(waitTime); + // Cancels the query request while the query is executing. + int rc = cancelQuery(getEndpoint(Servlets.REBALANCE), Collections.emptyList()); + Assert.assertTrue(rc == 200 || rc == 404); + Exception e = future.get(); + if (e != null) { + throw e; + } + } + + // Cancels a submitted query through the cancellation REST API. + private int cancelQuery(URI uri, List<TestCase.CompilationUnit.Parameter> params) throws Exception { + HttpUriRequest method = constructDeleteMethodUrl(uri, params); + HttpResponse response = executeHttpRequest(method); + return response.getStatusLine().getStatusCode(); + } + + // Constructs a HTTP DELETE request. + private HttpUriRequest constructDeleteMethodUrl(URI uri, List<TestCase.CompilationUnit.Parameter> otherParams) { + RequestBuilder builder = RequestBuilder.delete(uri); + for (TestCase.CompilationUnit.Parameter param : otherParams) { + builder.addParameter(param.getName(), param.getValue()); + } + builder.setCharset(StandardCharsets.UTF_8); + return builder.build(); + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index ed6a77a..8791756 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -1058,9 +1058,9 @@ } } - private void executeHttpRequest(OutputFormat fmt, String statement, Map<String, Object> variableCtx, String reqType, - File testFile, File expectedResultFile, File actualResultFile, MutableInt queryCount, int numResultFiles, - String extension, ComparisonEnum compare) throws Exception { + protected void executeHttpRequest(OutputFormat fmt, String statement, Map<String, Object> variableCtx, + String reqType, File testFile, File expectedResultFile, File actualResultFile, MutableInt queryCount, + int numResultFiles, String extension, ComparisonEnum compare) throws Exception { String handleVar = getHandleVariable(statement); final String trimmedPathAndQuery = stripLineComments(stripJavaComments(statement)).trim(); final String variablesReplaced = replaceVarRef(trimmedPathAndQuery, variableCtx); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java new file mode 100644 index 0000000..1d7bdc3 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceWithCancellationIT.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.runtime; + +import java.util.Collection; + +import org.apache.asterix.test.common.RebalanceCancellationTestExecutor; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Runs rebalance tests with cancellation. + */ +@RunWith(Parameterized.class) +public class RebalanceWithCancellationIT { + protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; + private static RebalanceCancellationTestExecutor executor = new RebalanceCancellationTestExecutor(); + + @BeforeClass + public static void setUp() throws Exception { + LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, executor); + } + + @AfterClass + public static void tearDown() throws Exception { + LangExecutionUtil.tearDown(); + } + + @Parameters(name = "RebalanceWithCancellationIT {index}: {0}") + public static Collection<Object[]> tests() throws Exception { + return LangExecutionUtil.tests("only_sqlpp.xml", "rebalance.xml"); + } + + protected TestCaseContext tcCtx; + + public RebalanceWithCancellationIT(TestCaseContext tcCtx) { + this.tcCtx = tcCtx; + } + + @Test + public void test() throws Exception { + // Runs each single cancellation test multiple times and tests cancellation at various points of time. + for (int waitTime = 100; waitTime <= 1000; waitTime += 50) { + executor.setWaitTime(waitTime); + LangExecutionUtil.test(tcCtx); + } + } +} diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java index cce069c..fff0775 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppExecutionWithCancellationTest.java @@ -20,6 +20,7 @@ import java.util.Collection; +import org.apache.asterix.common.exceptions.ExceptionUtils; import org.apache.asterix.test.common.CancellationTestExecutor; import org.apache.asterix.testframework.context.TestCaseContext; import org.junit.AfterClass; @@ -70,7 +71,7 @@ try { LangExecutionUtil.test(tcCtx); } catch (Exception e) { - String errorMsg = CancellationTestExecutor.getErrorMessage(e); + String errorMsg = ExceptionUtils.getErrorMessage(e); if (!errorMsg.contains("reference count = 1") // not expected, but is a false alarm. && !errorMsg.contains("pinned and file is being closed") // not expected, but maybe a false alarm. ) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 51a535a..3da58e9 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -35,6 +35,7 @@ import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.utils.TransactionUtil; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; @@ -263,19 +264,30 @@ @Override public synchronized void close(String resourcePath) throws HyracksDataException { - validateDatasetLifecycleManagerState(); - int did = getDIDfromResourcePath(resourcePath); - long resourceID = getResourceIDfromResourcePath(resourcePath); - DatasetResource dsr = datasets.get(did); - if (dsr == null) { - throw new HyracksDataException("No index found with resourceID " + resourceID); + DatasetResource dsr = null; + IndexInfo iInfo = null; + try { + validateDatasetLifecycleManagerState(); + int did = getDIDfromResourcePath(resourcePath); + long resourceID = getResourceIDfromResourcePath(resourcePath); + dsr = datasets.get(did); + if (dsr == null) { + throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID); + } + iInfo = dsr.getIndexInfo(resourceID); + if (iInfo == null) { + throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID); + } + } finally { + // Regardless of what exception is thrown in the try-block (e.g., line 279), + // we have to un-touch the index and dataset. + if (iInfo != null) { + iInfo.untouch(); + } + if (dsr != null) { + dsr.untouch(); + } } - IndexInfo iInfo = dsr.getIndexInfo(resourceID); - if (iInfo == null) { - throw new HyracksDataException("No index found with resourceID " + resourceID); - } - iInfo.untouch(); - dsr.untouch(); } @Override diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java index 649f1f5..3105b3f 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java @@ -20,7 +20,6 @@ public class ExceptionUtils { public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n"; - public static final String MISSING_PARAMETER = "Missing parameter.\n"; public static final String PARAMETER_NAME = "Parameter name: "; public static final String EXPECTED_VALUE = "Expected value: "; public static final String PASSED_VALUE = "Passed value: "; @@ -32,4 +31,22 @@ return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + System.lineSeparator() + EXPECTED_VALUE + expectedValue + System.lineSeparator() + PASSED_VALUE + passedValue; } + + // Gets the error message for the root cause of a given Throwable instance. + public static String getErrorMessage(Throwable th) { + Throwable cause = getRootCause(th); + return cause.getMessage(); + } + + // Finds the root cause of a given Throwable instance. + public static Throwable getRootCause(Throwable e) { + Throwable current = e; + Throwable cause = e.getCause(); + while (cause != null && cause != current) { + Throwable nextCause = current.getCause(); + current = cause; + cause = nextCause; + } + return current; + } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index 9e55a97..411f866 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -103,7 +103,14 @@ Dataset dataset) throws AlgebricksException { SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper .createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig); - return secondaryIndexHelper.buildDropJobSpec(); + return secondaryIndexHelper.buildDropJobSpec(false); + } + + public static JobSpecification buildDropIndexJobSpec(Index index, MetadataProvider metadataProvider, + Dataset dataset, boolean failSilently) throws AlgebricksException { + SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper + .createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig); + return secondaryIndexHelper.buildDropJobSpec(failSilently); } public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset dataset, Index index, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java index d11ba21..8c45c11 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java @@ -170,7 +170,7 @@ public abstract JobSpecification buildCompactJobSpec() throws AlgebricksException; - public abstract JobSpecification buildDropJobSpec() throws AlgebricksException; + public abstract JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException; protected void init() throws AlgebricksException { payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java index 907192c..2dcab4f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java @@ -66,14 +66,15 @@ } @Override - public JobSpecification buildDropJobSpec() throws AlgebricksException { + public JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName()); IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory( metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first); // The index drop operation should be persistent regardless of temp datasets or permanent dataset. - IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory); + IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, + failSilently); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second); spec.addRoot(btreeDrop); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index cb03ae4..e530bc3 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -213,13 +213,18 @@ public synchronized void delete(String relativePath) throws HyracksDataException { FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath); if (resourceFile.getFile().exists()) { - resourceFile.delete(); - resourceCache.invalidate(relativePath); - - //if replication enabled, delete resource from remote replicas - if (isReplicationEnabled - && !resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) { - createReplicationJob(ReplicationOperation.DELETE, resourceFile); + try { + // Invalidate before deleting the file just in case file deletion throws some exception. + // Since it's just a cache invalidation, it should not affect correctness. + resourceCache.invalidate(relativePath); + resourceFile.delete(); + } finally { + // Regardless of successfully deleted or not, the operation should be replicated. + //if replication enabled, delete resource from remote replicas + if (isReplicationEnabled + && !resourceFile.getFile().getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) { + createReplicationJob(ReplicationOperation.DELETE, resourceFile); + } } } else { throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 8a91547..2e6c8a3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -116,6 +116,7 @@ public static final int CANNOT_CREATE_EXISTING_INDEX = 80; public static final int FILE_ALREADY_MAPPED = 81; public static final int FILE_ALREADY_EXISTS = 82; + public static final int NO_INDEX_FOUND_WITH_RESOURCE_ID = 83; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 9f983a7..6389ffa 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -99,5 +99,6 @@ 80 = Cannot create index because it already exists 81 = File %1$s is already mapped 82 = Failed to create the file %1$s because it already exists +83 = No index found with resourceID %1$s 10000 = The given rule collection %1$s is not an instance of the List class. diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java index d162be0..18c7107 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java @@ -30,16 +30,23 @@ private static final long serialVersionUID = 1L; private final IIndexDataflowHelperFactory dataflowHelperFactory; + private final boolean failSilently; public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec, IIndexDataflowHelperFactory dataflowHelperFactory) { + this(spec, dataflowHelperFactory, false); + } + + public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec, + IIndexDataflowHelperFactory dataflowHelperFactory, boolean failSilently) { super(spec, 0, 0); this.dataflowHelperFactory = dataflowHelperFactory; + this.failSilently = failSilently; } @Override public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - return new IndexDropOperatorNodePushable(dataflowHelperFactory, ctx, partition); + return new IndexDropOperatorNodePushable(dataflowHelperFactory, failSilently, ctx, partition); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java index fce31ca..7c2021b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java @@ -28,10 +28,12 @@ public class IndexDropOperatorNodePushable extends AbstractOperatorNodePushable { private final IIndexDataflowHelper indexHelper; + private final boolean failSliently; - public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, IHyracksTaskContext ctx, - int partition) throws HyracksDataException { + public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, boolean failSilently, + IHyracksTaskContext ctx, int partition) throws HyracksDataException { this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); + this.failSliently = failSilently; } @Override @@ -50,7 +52,13 @@ @Override public void initialize() throws HyracksDataException { - indexHelper.destroy(); + try { + indexHelper.destroy(); + } catch (HyracksDataException e) { + if (!failSliently) { + throw e; + } + } } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/1821 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I0d14a07978e106cd497cc35538fafef318b2fcf7 Gerrit-PatchSet: 15 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <buyin...@gmail.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Yingyi Bu <buyin...@gmail.com> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>