Repository: lens Updated Branches: refs/heads/master 7f3731ef4 -> cb48aa386
LENS-1413: Estimates failing with Task rejection errors Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/cb48aa38 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/cb48aa38 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/cb48aa38 Branch: refs/heads/master Commit: cb48aa3866ec29ba68490cfdadf0611779303ef1 Parents: 7f3731e Author: Lavkesh Lahngir <lavk...@linux.com> Authored: Tue Apr 25 15:47:31 2017 +0530 Committer: Puneet <puneet.gu...@inmobi.com> Committed: Tue Apr 25 15:47:31 2017 +0530 ---------------------------------------------------------------------- lens-api/src/main/resources/lens-errors.conf | 7 ++ .../lens/server/error/LensServerErrorCode.java | 3 +- .../server/query/QueryExecutionServiceImpl.java | 28 +++++- .../lens/server/common/FailingQueryDriver.java | 16 ++++ .../lens/server/query/TestQueryService.java | 97 +++++++++++++++++++- lens-server/src/test/resources/lens-site.xml | 5 + 6 files changed, 148 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-api/src/main/resources/lens-errors.conf ---------------------------------------------------------------------- diff --git a/lens-api/src/main/resources/lens-errors.conf b/lens-api/src/main/resources/lens-errors.conf index 236d678..e5536bb 100644 --- a/lens-api/src/main/resources/lens-errors.conf +++ b/lens-api/src/main/resources/lens-errors.conf @@ -120,6 +120,13 @@ lensServerErrors = [ httpStatusCode = ${BAD_REQUEST} errorMsg = "Unsupported null value received for argument type: %s" } + + { + errorCode = 2008 + httpStatusCode = ${INTERNAL_SERVER_ERROR} + errorMsg = "Server is overloaded at this time" + } + ] # lensCubeErrors: Defined for lens-cube module http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-server/src/main/java/org/apache/lens/server/error/LensServerErrorCode.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/error/LensServerErrorCode.java b/lens-server/src/main/java/org/apache/lens/server/error/LensServerErrorCode.java index 14a31e2..b9150e9 100644 --- a/lens-server/src/main/java/org/apache/lens/server/error/LensServerErrorCode.java +++ b/lens-server/src/main/java/org/apache/lens/server/error/LensServerErrorCode.java @@ -28,7 +28,8 @@ public enum LensServerErrorCode { TOO_MANY_OPEN_SESSIONS(2004, 0), SESSION_CLOSED(2005, 0), INVALID_HANDLE(2006, 0), - NULL_OR_EMPTY_ARGUMENT(2007, 0); + NULL_OR_EMPTY_ARGUMENT(2007, 0), + SERVER_OVERLOADED(2008, 0); public LensErrorInfo getLensErrorInfo() { return this.errorInfo; http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index c6fbeda..e70d290 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -21,6 +21,7 @@ package org.apache.lens.server.query; import static org.apache.lens.api.query.QueryStatus.Status.*; import static org.apache.lens.server.api.LensConfConstants.*; import static org.apache.lens.server.api.util.LensUtil.getImplementations; +import static org.apache.lens.server.error.LensServerErrorCode.SERVER_OVERLOADED; import static org.apache.lens.server.session.LensSessionImpl.ResourceEntry; import java.io.*; @@ -64,7 +65,10 @@ import org.apache.lens.server.api.query.comparators.*; import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint; import org.apache.lens.server.api.query.cost.QueryCost; import org.apache.lens.server.api.query.events.*; -import org.apache.lens.server.api.retry.*; +import org.apache.lens.server.api.retry.BackOffRetryHandler; +import org.apache.lens.server.api.retry.ChainedRetryPolicyDecider; +import org.apache.lens.server.api.retry.OperationRetryHandlerFactory; +import org.apache.lens.server.api.retry.RetryPolicyDecider; import org.apache.lens.server.api.util.LensUtil; import org.apache.lens.server.model.LogSegregationContext; import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext; @@ -1698,6 +1702,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE List<RewriteEstimateRunnable> runnables = new ArrayList<RewriteEstimateRunnable>(numDrivers); List<Future> estimateFutures = new ArrayList<Future>(); + boolean cancelAllDueToOverload = false; for (final LensDriver driver : ctx.getDriverContext().getDrivers()) { RewriteEstimateRunnable r = new RewriteEstimateRunnable(driver, rewriteRunnables.get(driver), @@ -1705,10 +1710,25 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE ctx, estimateCompletionLatch); // Submit composite rewrite + estimate operation to background pool - estimateFutures.add(estimatePool.submit(r)); - runnables.add(r); + try { + estimateFutures.add(estimatePool.submit(r)); + runnables.add(r); + } catch (RejectedExecutionException e) { + // This means the server can't accept anymore estimate requests at this time. + // Cancel all other futures. + cancelAllDueToOverload = true; + log.warn("Rejected from submitting to the estimate pool for driver {} ", r.getDriver(), e); + break; + } + } + if (cancelAllDueToOverload) { + for (int i = 0; i < runnables.size(); i++) { + RewriteEstimateRunnable r = runnables.get(i); + estimateFutures.get(i).cancel(true); + log.info("Cancelling estimate tasks for driver due to incomplete driver {}", r.getDriver()); + } + throw new LensException(SERVER_OVERLOADED.getLensErrorInfo()); } - // Wait for all rewrite and estimates to finish try { long estimateLatchTimeout = ctx.getConf().getLong(ESTIMATE_TIMEOUT_MILLIS, http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-server/src/test/java/org/apache/lens/server/common/FailingQueryDriver.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/common/FailingQueryDriver.java b/lens-server/src/test/java/org/apache/lens/server/common/FailingQueryDriver.java index 7e9133e..5525fa3 100644 --- a/lens-server/src/test/java/org/apache/lens/server/common/FailingQueryDriver.java +++ b/lens-server/src/test/java/org/apache/lens/server/common/FailingQueryDriver.java @@ -21,6 +21,7 @@ package org.apache.lens.server.common; import javax.ws.rs.NotFoundException; +import org.apache.lens.api.LensConf; import org.apache.lens.server.api.driver.DriverQueryPlan; import org.apache.lens.server.api.driver.MockDriver; import org.apache.lens.server.api.error.LensException; @@ -29,12 +30,27 @@ import org.apache.lens.server.api.query.QueryContext; import org.apache.lens.server.api.query.cost.FactPartitionBasedQueryCost; import org.apache.lens.server.api.query.cost.QueryCost; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class FailingQueryDriver extends MockDriver { @Override public QueryCost estimate(final AbstractQueryContext ctx) throws LensException { + LensConf lensConf = ctx.getLensConf(); + String driverSleep = lensConf.getProperty("mock.driver.sleep"); if (ctx.getUserQuery().contains("fail")) { return new FactPartitionBasedQueryCost(0.0); + } else if (driverSleep != null && driverSleep.equals("true")) { + try { + String sleepMS = lensConf.getProperty("mock.driver.sleep.ms"); + if (sleepMS != null && !sleepMS.isEmpty()) { + Thread.sleep(Long.parseLong(sleepMS)); + } + } catch (InterruptedException e) { + log.error("Sleeping thread interrupted", e); + } + return new FactPartitionBasedQueryCost(0.0); } else { throw new LensException("Simulated Estimate Failure"); } http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java index 7d8c977..1149696 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java @@ -23,21 +23,32 @@ import static javax.ws.rs.core.Response.Status.*; import static org.apache.lens.server.LensServerTestUtil.DB_WITH_JARS; import static org.apache.lens.server.LensServerTestUtil.DB_WITH_JARS_2; + import static org.apache.lens.server.api.LensServerAPITestUtil.getLensConf; + import static org.apache.lens.server.api.user.MockDriverQueryHook.*; + import static org.apache.lens.server.common.RestAPITestUtil.*; import static org.testng.Assert.*; import java.io.*; import java.net.URLEncoder; -import java.sql.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; import java.util.*; +import java.util.function.Predicate; +import java.util.stream.Collectors; import javax.ws.rs.NotFoundException; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.*; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import org.apache.lens.api.APIResult; import org.apache.lens.api.LensConf; @@ -91,7 +102,6 @@ import org.testng.annotations.*; import com.codahale.metrics.MetricRegistry; import com.google.common.base.Optional; - import lombok.extern.slf4j.Slf4j; /** @@ -2021,4 +2031,85 @@ public class TestQueryService extends LensJerseyTest { assertTrue((lensQuery.getFinishTime() - lensQuery.getLaunchTime()) < 400, "Query time is " + (lensQuery.getFinishTime() - lensQuery.getLaunchTime())); } + + @Test(dataProvider = "mediaTypeData") + public void testEstimateRejectionException(MediaType mt) throws Exception { + class EstimateRunnable implements Runnable { + boolean failed = false; + boolean completed = false; + boolean wrongMessage = true; + + @Override + public void run() { + Map<String, String> sessionconf = new HashMap<>(); + sessionconf.put("test.session.key", "svalue"); + LensSessionHandle sessionhandle = null; + try { + sessionhandle = queryService.openSession("foo@localhost", "bar", sessionconf); + final WebTarget target = target().path("queryapi/queries"); + LensConf lensConf = new LensConf(); + lensConf.addProperty("mock.driver.sleep", "true"); + lensConf.addProperty("mock.driver.sleep.ms", "500"); + final FormDataMultiPart mp = new FormDataMultiPart(); + mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), sessionhandle, mt)); + mp.bodyPart( + new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID from " + TEST_TABLE)); + mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "estimate")); + mp.bodyPart( + new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), lensConf, mt)); + Response response = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE)); + LensAPIResult result = response.readEntity(LensAPIResult.class); + if (response.getStatus() == 500) { + failed = true; + if (result.getLensErrorTO().getMessage().equals("Server is overloaded at this time")) { + wrongMessage = false; + } + } + if (response.getStatus() == 200) { + completed = true; + } + } catch (LensException e) { + log.error("Error in the EstimateRunnable thread while creating a session", e); + } finally { + try { + queryService.closeSession(sessionhandle); + } catch (LensException e) { + log.error("Error in the EstimateRunnable thread while closing the session", e); + } + } + } + } + List<Thread> threads = new ArrayList<>(); + List<EstimateRunnable> estimateRunnables = new ArrayList<>(); + + int totalTasks = 10; + for (int i = 0; i < totalTasks; i++) { + EstimateRunnable r = new EstimateRunnable(); + estimateRunnables.add(r); + Thread t = new Thread(r); + threads.add(t); + t.start(); + } + // Wait for them to finish + for (Thread t : threads) { + t.join(); + } + List<EstimateRunnable> completedTaks = estimateRunnables.stream().filter(new Predicate<EstimateRunnable>() { + @Override + public boolean test(EstimateRunnable estimateRunnable) { + return estimateRunnable.completed; + } + }).collect(Collectors.toList()); + List<EstimateRunnable> inCompleteTasks = estimateRunnables.stream().filter(new Predicate<EstimateRunnable>() { + @Override + public boolean test(EstimateRunnable estimateRunnable) { + // If estimate was failed, it should only be because of the server getting overloaded. + return estimateRunnable.failed && !estimateRunnable.wrongMessage; + } + }).collect(Collectors.toList()); + + assertTrue(completedTaks.size() > 0); + assertTrue(inCompleteTasks.size() > 0); + assertEquals(completedTaks.size() + inCompleteTasks.size(), totalTasks); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-server/src/test/resources/lens-site.xml ---------------------------------------------------------------------- diff --git a/lens-server/src/test/resources/lens-site.xml b/lens-server/src/test/resources/lens-site.xml index 334a9c4..0060fa7 100644 --- a/lens-server/src/test/resources/lens-site.xml +++ b/lens-server/src/test/resources/lens-site.xml @@ -192,6 +192,11 @@ <value>20</value> <description>Number of sessions can be allowed for each user.</description> </property> + <property> + <name>lens.server.estimate.pool.max.threads</name> + <value>20</value> + <description>Lens server estimate max threads</description> + </property> <property> <name>lens.server.status.update.exponential.wait.millis</name> <value>1000</value>