Repository: lens
Updated Branches:
  refs/heads/master 7f3731ef4 -> cb48aa386


LENS-1413: Estimates failing with Task rejection errors


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/cb48aa38
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/cb48aa38
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/cb48aa38

Branch: refs/heads/master
Commit: cb48aa3866ec29ba68490cfdadf0611779303ef1
Parents: 7f3731e
Author: Lavkesh Lahngir <lavk...@linux.com>
Authored: Tue Apr 25 15:47:31 2017 +0530
Committer: Puneet <puneet.gu...@inmobi.com>
Committed: Tue Apr 25 15:47:31 2017 +0530

----------------------------------------------------------------------
 lens-api/src/main/resources/lens-errors.conf    |  7 ++
 .../lens/server/error/LensServerErrorCode.java  |  3 +-
 .../server/query/QueryExecutionServiceImpl.java | 28 +++++-
 .../lens/server/common/FailingQueryDriver.java  | 16 ++++
 .../lens/server/query/TestQueryService.java     | 97 +++++++++++++++++++-
 lens-server/src/test/resources/lens-site.xml    |  5 +
 6 files changed, 148 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-api/src/main/resources/lens-errors.conf
----------------------------------------------------------------------
diff --git a/lens-api/src/main/resources/lens-errors.conf 
b/lens-api/src/main/resources/lens-errors.conf
index 236d678..e5536bb 100644
--- a/lens-api/src/main/resources/lens-errors.conf
+++ b/lens-api/src/main/resources/lens-errors.conf
@@ -120,6 +120,13 @@ lensServerErrors = [
     httpStatusCode = ${BAD_REQUEST}
     errorMsg = "Unsupported null value received for argument type: %s"
   }
+
+  {
+    errorCode = 2008
+    httpStatusCode = ${INTERNAL_SERVER_ERROR}
+    errorMsg = "Server is overloaded at this time"
+  }
+
 ]
 
 # lensCubeErrors: Defined for lens-cube module

http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-server/src/main/java/org/apache/lens/server/error/LensServerErrorCode.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/error/LensServerErrorCode.java
 
b/lens-server/src/main/java/org/apache/lens/server/error/LensServerErrorCode.java
index 14a31e2..b9150e9 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/error/LensServerErrorCode.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/error/LensServerErrorCode.java
@@ -28,7 +28,8 @@ public enum LensServerErrorCode {
   TOO_MANY_OPEN_SESSIONS(2004, 0),
   SESSION_CLOSED(2005, 0),
   INVALID_HANDLE(2006, 0),
-  NULL_OR_EMPTY_ARGUMENT(2007, 0);
+  NULL_OR_EMPTY_ARGUMENT(2007, 0),
+  SERVER_OVERLOADED(2008, 0);
 
   public LensErrorInfo getLensErrorInfo() {
     return this.errorInfo;

http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index c6fbeda..e70d290 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -21,6 +21,7 @@ package org.apache.lens.server.query;
 import static org.apache.lens.api.query.QueryStatus.Status.*;
 import static org.apache.lens.server.api.LensConfConstants.*;
 import static org.apache.lens.server.api.util.LensUtil.getImplementations;
+import static 
org.apache.lens.server.error.LensServerErrorCode.SERVER_OVERLOADED;
 import static org.apache.lens.server.session.LensSessionImpl.ResourceEntry;
 
 import java.io.*;
@@ -64,7 +65,10 @@ import org.apache.lens.server.api.query.comparators.*;
 import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint;
 import org.apache.lens.server.api.query.cost.QueryCost;
 import org.apache.lens.server.api.query.events.*;
-import org.apache.lens.server.api.retry.*;
+import org.apache.lens.server.api.retry.BackOffRetryHandler;
+import org.apache.lens.server.api.retry.ChainedRetryPolicyDecider;
+import org.apache.lens.server.api.retry.OperationRetryHandlerFactory;
+import org.apache.lens.server.api.retry.RetryPolicyDecider;
 import org.apache.lens.server.api.util.LensUtil;
 import org.apache.lens.server.model.LogSegregationContext;
 import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext;
@@ -1698,6 +1702,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       List<RewriteEstimateRunnable> runnables = new 
ArrayList<RewriteEstimateRunnable>(numDrivers);
       List<Future> estimateFutures = new ArrayList<Future>();
 
+      boolean cancelAllDueToOverload = false;
       for (final LensDriver driver : ctx.getDriverContext().getDrivers()) {
         RewriteEstimateRunnable r = new RewriteEstimateRunnable(driver,
           rewriteRunnables.get(driver),
@@ -1705,10 +1710,25 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
           ctx, estimateCompletionLatch);
 
         // Submit composite rewrite + estimate operation to background pool
-        estimateFutures.add(estimatePool.submit(r));
-        runnables.add(r);
+        try {
+          estimateFutures.add(estimatePool.submit(r));
+          runnables.add(r);
+        } catch (RejectedExecutionException e) {
+          // This means the server can't accept anymore estimate requests at 
this time.
+          // Cancel all other futures.
+          cancelAllDueToOverload = true;
+          log.warn("Rejected from submitting to the estimate pool for driver 
{} ", r.getDriver(), e);
+          break;
+        }
+      }
+      if (cancelAllDueToOverload) {
+        for (int i = 0; i < runnables.size(); i++) {
+          RewriteEstimateRunnable r = runnables.get(i);
+          estimateFutures.get(i).cancel(true);
+          log.info("Cancelling estimate tasks for driver due to incomplete 
driver {}", r.getDriver());
+        }
+        throw new LensException(SERVER_OVERLOADED.getLensErrorInfo());
       }
-
       // Wait for all rewrite and estimates to finish
       try {
         long estimateLatchTimeout = 
ctx.getConf().getLong(ESTIMATE_TIMEOUT_MILLIS,

http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-server/src/test/java/org/apache/lens/server/common/FailingQueryDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/common/FailingQueryDriver.java
 
b/lens-server/src/test/java/org/apache/lens/server/common/FailingQueryDriver.java
index 7e9133e..5525fa3 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/common/FailingQueryDriver.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/common/FailingQueryDriver.java
@@ -21,6 +21,7 @@ package org.apache.lens.server.common;
 
 import javax.ws.rs.NotFoundException;
 
+import org.apache.lens.api.LensConf;
 import org.apache.lens.server.api.driver.DriverQueryPlan;
 import org.apache.lens.server.api.driver.MockDriver;
 import org.apache.lens.server.api.error.LensException;
@@ -29,12 +30,27 @@ import org.apache.lens.server.api.query.QueryContext;
 import org.apache.lens.server.api.query.cost.FactPartitionBasedQueryCost;
 import org.apache.lens.server.api.query.cost.QueryCost;
 
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class FailingQueryDriver extends MockDriver {
 
   @Override
   public QueryCost estimate(final AbstractQueryContext ctx) throws 
LensException {
+    LensConf lensConf = ctx.getLensConf();
+    String driverSleep = lensConf.getProperty("mock.driver.sleep");
     if (ctx.getUserQuery().contains("fail")) {
       return new FactPartitionBasedQueryCost(0.0);
+    } else if (driverSleep != null && driverSleep.equals("true")) {
+      try {
+        String sleepMS = lensConf.getProperty("mock.driver.sleep.ms");
+        if (sleepMS != null && !sleepMS.isEmpty()) {
+          Thread.sleep(Long.parseLong(sleepMS));
+        }
+      } catch (InterruptedException e) {
+        log.error("Sleeping thread interrupted", e);
+      }
+      return new FactPartitionBasedQueryCost(0.0);
     } else {
       throw new LensException("Simulated Estimate Failure");
     }

http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
index 7d8c977..1149696 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
@@ -23,21 +23,32 @@ import static javax.ws.rs.core.Response.Status.*;
 
 import static org.apache.lens.server.LensServerTestUtil.DB_WITH_JARS;
 import static org.apache.lens.server.LensServerTestUtil.DB_WITH_JARS_2;
+
 import static org.apache.lens.server.api.LensServerAPITestUtil.getLensConf;
+
 import static org.apache.lens.server.api.user.MockDriverQueryHook.*;
+
 import static org.apache.lens.server.common.RestAPITestUtil.*;
 
 import static org.testng.Assert.*;
 
 import java.io.*;
 import java.net.URLEncoder;
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.*;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import javax.ws.rs.NotFoundException;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.*;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 
 import org.apache.lens.api.APIResult;
 import org.apache.lens.api.LensConf;
@@ -91,7 +102,6 @@ import org.testng.annotations.*;
 
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.base.Optional;
-
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -2021,4 +2031,85 @@ public class TestQueryService extends LensJerseyTest {
     assertTrue((lensQuery.getFinishTime() - lensQuery.getLaunchTime()) < 400, 
"Query time is "
       + (lensQuery.getFinishTime() - lensQuery.getLaunchTime()));
   }
+
+  @Test(dataProvider = "mediaTypeData")
+  public void testEstimateRejectionException(MediaType mt) throws Exception {
+    class EstimateRunnable implements Runnable {
+      boolean failed = false;
+      boolean completed = false;
+      boolean wrongMessage = true;
+
+      @Override
+      public void run() {
+        Map<String, String> sessionconf = new HashMap<>();
+        sessionconf.put("test.session.key", "svalue");
+        LensSessionHandle sessionhandle = null;
+        try {
+          sessionhandle = queryService.openSession("foo@localhost", "bar", 
sessionconf);
+          final WebTarget target = target().path("queryapi/queries");
+          LensConf lensConf = new LensConf();
+          lensConf.addProperty("mock.driver.sleep", "true");
+          lensConf.addProperty("mock.driver.sleep.ms", "500");
+          final FormDataMultiPart mp = new FormDataMultiPart();
+          mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), 
sessionhandle, mt));
+          mp.bodyPart(
+            new 
FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID 
from " + TEST_TABLE));
+          mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("operation").build(), 
"estimate"));
+          mp.bodyPart(
+            new 
FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(),
 lensConf, mt));
+          Response response = target.request(mt).post(Entity.entity(mp, 
MediaType.MULTIPART_FORM_DATA_TYPE));
+          LensAPIResult result = response.readEntity(LensAPIResult.class);
+          if (response.getStatus() == 500) {
+            failed = true;
+            if (result.getLensErrorTO().getMessage().equals("Server is 
overloaded at this time")) {
+              wrongMessage = false;
+            }
+          }
+          if (response.getStatus() == 200) {
+            completed = true;
+          }
+        } catch (LensException e) {
+          log.error("Error in the EstimateRunnable thread while creating a 
session", e);
+        } finally {
+          try {
+            queryService.closeSession(sessionhandle);
+          } catch (LensException e) {
+            log.error("Error in the EstimateRunnable thread while closing the 
session", e);
+          }
+        }
+      }
+    }
+    List<Thread> threads = new ArrayList<>();
+    List<EstimateRunnable> estimateRunnables = new ArrayList<>();
+
+    int totalTasks = 10;
+    for (int i = 0; i < totalTasks; i++) {
+      EstimateRunnable r = new EstimateRunnable();
+      estimateRunnables.add(r);
+      Thread t = new Thread(r);
+      threads.add(t);
+      t.start();
+    }
+    // Wait for them to finish
+    for (Thread t : threads) {
+      t.join();
+    }
+    List<EstimateRunnable> completedTaks = 
estimateRunnables.stream().filter(new Predicate<EstimateRunnable>() {
+      @Override
+      public boolean test(EstimateRunnable estimateRunnable) {
+        return estimateRunnable.completed;
+      }
+    }).collect(Collectors.toList());
+    List<EstimateRunnable> inCompleteTasks = 
estimateRunnables.stream().filter(new Predicate<EstimateRunnable>() {
+      @Override
+      public boolean test(EstimateRunnable estimateRunnable) {
+        // If estimate was failed, it should only be because of the server 
getting overloaded.
+        return estimateRunnable.failed && !estimateRunnable.wrongMessage;
+      }
+    }).collect(Collectors.toList());
+
+    assertTrue(completedTaks.size() > 0);
+    assertTrue(inCompleteTasks.size() > 0);
+    assertEquals(completedTaks.size() + inCompleteTasks.size(), totalTasks);
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/cb48aa38/lens-server/src/test/resources/lens-site.xml
----------------------------------------------------------------------
diff --git a/lens-server/src/test/resources/lens-site.xml 
b/lens-server/src/test/resources/lens-site.xml
index 334a9c4..0060fa7 100644
--- a/lens-server/src/test/resources/lens-site.xml
+++ b/lens-server/src/test/resources/lens-site.xml
@@ -192,6 +192,11 @@
     <value>20</value>
     <description>Number of sessions can be allowed for each user.</description>
   </property>
+    <property>
+    <name>lens.server.estimate.pool.max.threads</name>
+    <value>20</value>
+    <description>Lens server estimate max threads</description>
+  </property>
   <property>
     <name>lens.server.status.update.exponential.wait.millis</name>
     <value>1000</value>

Reply via email to