Repository: lens Updated Branches: refs/heads/current-release-line 837e39944 -> 7c1171c8d
http://git-wip-us.apache.org/repos/asf/lens/blob/97fe26d3/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITCostTests.java ---------------------------------------------------------------------- diff --git a/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITCostTests.java b/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITCostTests.java index aeae8aa..1a7a391 100644 --- a/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITCostTests.java +++ b/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITCostTests.java @@ -19,7 +19,6 @@ package org.apache.lens.regression.throttling; -import java.io.IOException; import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.TimeUnit; @@ -37,7 +36,6 @@ import org.apache.lens.regression.core.helpers.*; import org.apache.lens.regression.core.testHelper.BaseTestClass; import org.apache.lens.regression.util.Util; import org.apache.lens.server.api.LensConfConstants; -import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.util.LensUtil; import org.apache.log4j.Logger; @@ -45,9 +43,6 @@ import org.apache.log4j.Logger; import org.testng.Assert; import org.testng.annotations.*; -import com.jcraft.jsch.JSchException; -import com.jcraft.jsch.SftpException; - public class ITCostTests extends BaseTestClass { private WebTarget servLens; @@ -64,7 +59,7 @@ public class ITCostTests extends BaseTestClass { public static final String JDBC_QUERY1 = QueryInventory.getQueryFromInventory("JDBC.QUERY1"); private static String hiveDriver = "hive/hive1"; - private String hiveDriverSitePath = lens.getServerDir() + "/conf/drivers/hive/hive1/hivedriver-site.xml"; + private String lensSitePath = lens.getServerDir() + "/conf/lens-site.xml"; private static final long SECONDS_IN_A_MINUTE = 60; private static Logger logger = Logger.getLogger(ITCostTests.class); @@ -75,6 +70,11 @@ public class ITCostTests extends BaseTestClass { logger.info("Creating a new Session"); sessionHandleString = sHelper.openSession(lens.getCurrentDB()); sHelper.setAndValidateParam(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false"); + + HashMap<String, String> map = LensUtil.getHashMap(LensConfConstants.TOTAL_QUERY_COST_CEILING_PER_USER_KEY, "60", + LensConfConstants.QUERY_LAUNCHING_CONSTRAINT_FACTORIES_KEY, DriverConfig.USER_COST_CONSTRAINT_FACTORY); + Util.changeConfig(map, lensSitePath); + lens.restart(); } @BeforeMethod(alwaysRun = true) @@ -94,29 +94,10 @@ public class ITCostTests extends BaseTestClass { public void closeSession() throws Exception { logger.info("Closing Session"); sHelper.closeSession(); - } - - @BeforeGroups("user-cost-ceiling") - public void setUserCeilingconfig() throws Exception { - try{ - HashMap<String, String> map = LensUtil.getHashMap(LensConfConstants.TOTAL_QUERY_COST_CEILING_PER_USER_KEY, "60", - DriverConfig.HIVE_CONSTRAINT_FACTORIES, - DriverConfig.MAX_CONCURRENT_CONSTRAINT_FACTORY + "," + DriverConfig.USER_COST_CONSTRAINT_FACTORY, - DriverConfig.MAX_CONCURRENT_QUERIES, "10"); - Util.changeConfig(map, hiveDriverSitePath); - lens.restart(); - }catch (Exception e){ - logger.info(e); - } - } - - @AfterGroups("user-cost-ceiling") - public void restoreConfig() throws SftpException, JSchException, InterruptedException, LensException, IOException { - Util.changeConfig(hiveDriverSitePath); + Util.changeConfig(lensSitePath); lens.restart(); } - @Test(enabled = true, groups= "user-cost-ceiling") public void testUserCostCeiling() throws Exception { @@ -293,148 +274,5 @@ public class ITCostTests extends BaseTestClass { //TODO : Add queue level throttling along with user ceiling constraint - /* - * LENS-995 : Queue number shouldn't change with in the same prority - */ - - @Test(enabled = true) - public void queueNumberChangeWithInSamePriority() throws Exception { - - String longRunningQuery = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "20"); - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "1"); - String[] queries = {longRunningQuery, COST_5, COST_5, COST_3, COST_2}; - - try { - Util.changeConfig(map, hiveDriverSitePath); - lens.restart(); - - List<QueryHandle> handleList = new ArrayList<>(); - for(String query : queries){ - handleList.add((QueryHandle) qHelper.executeQuery(query).getData()); - } - - LensQuery lq1 = qHelper.getLensQuery(sessionHandleString, handleList.get(1)); - LensQuery lq2 = qHelper.getLensQuery(sessionHandleString, handleList.get(2)); - LensQuery lq3 = qHelper.getLensQuery(sessionHandleString, handleList.get(3)); - LensQuery lq4 = qHelper.getLensQuery(sessionHandleString, handleList.get(4)); - - Assert.assertEquals(lq1.getStatus().getQueueNumber().intValue(), 1); - Assert.assertEquals(lq2.getStatus().getQueueNumber().intValue(), 2); - Assert.assertEquals(lq3.getStatus().getQueueNumber().intValue(), 3); - Assert.assertEquals(lq4.getStatus().getQueueNumber().intValue(), 4); - - LensQuery lq0 = qHelper.waitForCompletion(handleList.get(0)); - - lq1 = qHelper.getLensQuery(sessionHandleString, handleList.get(1)); - lq2 = qHelper.getLensQuery(sessionHandleString, handleList.get(2)); - lq3 = qHelper.getLensQuery(sessionHandleString, handleList.get(3)); - lq4 = qHelper.getLensQuery(sessionHandleString, handleList.get(4)); - - Assert.assertEquals(lq0.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); - Assert.assertEquals(lq2.getStatus().getQueueNumber().intValue(), 1); - Assert.assertEquals(lq3.getStatus().getQueueNumber().intValue(), 2); - Assert.assertEquals(lq4.getStatus().getQueueNumber().intValue(), 3); - - lq1 = qHelper.waitForCompletion(handleList.get(1)); - - lq2 = qHelper.getLensQuery(sessionHandleString, handleList.get(2)); - lq3 = qHelper.getLensQuery(sessionHandleString, handleList.get(3)); - lq4 = qHelper.getLensQuery(sessionHandleString, handleList.get(4)); - - Assert.assertEquals(lq1.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); - Assert.assertEquals(lq3.getStatus().getQueueNumber().intValue(), 1); - Assert.assertEquals(lq4.getStatus().getQueueNumber().intValue(), 2); - - }finally { - Util.changeConfig(hiveDriverSitePath); - lens.restart(); - } - } - - - @Test(enabled = true) - public void queueNumberChangeDifferentPriority() throws Exception { - - String longRunningQuery = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "20"); - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "1"); - try { - Util.changeConfig(map, hiveDriverSitePath); - lens.restart(); - - QueryHandle q0 = (QueryHandle) qHelper.executeQuery(longRunningQuery).getData(); - QueryHandle q1 = (QueryHandle) qHelper.executeQuery(COST_20).getData(); - QueryHandle q2 = (QueryHandle) qHelper.executeQuery(COST_2).getData(); - - LensQuery normal1 = qHelper.getLensQuery(sessionHandleString, q1); - LensQuery high1 = qHelper.getLensQuery(sessionHandleString, q2); - - Assert.assertEquals(normal1.getStatus().getQueueNumber().intValue(), 2); - Assert.assertEquals(high1.getStatus().getQueueNumber().intValue(), 1); - - QueryHandle q3 = (QueryHandle) qHelper.executeQuery(COST_5).getData(); - - LensQuery high2 = qHelper.getLensQuery(sessionHandleString, q3); - high1 = qHelper.getLensQuery(sessionHandleString, q2); - normal1 = qHelper.getLensQuery(sessionHandleString, q1); - - Assert.assertEquals(normal1.getStatus().getQueueNumber().intValue(), 3); - Assert.assertEquals(high1.getStatus().getQueueNumber().intValue(), 1); - Assert.assertEquals(high2.getStatus().getQueueNumber().intValue(), 2); - - QueryHandle q4 = (QueryHandle) qHelper.executeQuery(COST_20).getData(); - - LensQuery normal2 = qHelper.getLensQuery(sessionHandleString, q4); - normal1 = qHelper.getLensQuery(sessionHandleString, q1); - high1 = qHelper.getLensQuery(sessionHandleString, q2); - high2 = qHelper.getLensQuery(sessionHandleString, q3); - - Assert.assertEquals(high1.getStatus().getQueueNumber().intValue(), 1); - Assert.assertEquals(high2.getStatus().getQueueNumber().intValue(), 2); - Assert.assertEquals(normal1.getStatus().getQueueNumber().intValue(), 3); - Assert.assertEquals(normal2.getStatus().getQueueNumber().intValue(), 4); - - }finally { - Util.changeConfig(hiveDriverSitePath); - lens.restart(); - } - } - - - @Test(enabled = true) - public void queueNumberChangeDifferentPriorityWithJdbc() throws Exception { - - String longRunningQuery = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "20"); - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "1"); - List<QueryHandle> handleList = new ArrayList<>(); - - try { - Util.changeConfig(map, hiveDriverSitePath); - lens.restart(); - - String[] queries = {COST_20, COST_2, COST_3, COST_60, COST_5, COST_10, COST_3}; - // Queue order is determined from priority and order in which queries are fired. - int[] queueNo = {5, 1, 2, 7, 3, 6, 4}; - - qHelper.executeQuery(longRunningQuery); - for(String query : queries){ - handleList.add((QueryHandle) qHelper.executeQuery(query).getData()); - qHelper.executeQuery(JDBC_QUERY1).getData(); - } - - List<LensQuery> lqList = new ArrayList<>(); - for(QueryHandle qh : handleList){ - lqList.add(qHelper.getLensQuery(sessionHandleString, qh)); - } - - for(int i = 0; i < lqList.size(); i++) { - Assert.assertEquals(lqList.get(i).getStatus().getQueueNumber().intValue(), queueNo[i]); - } - - }finally { - Util.changeConfig(hiveDriverSitePath); - lens.restart(); - } - } - } http://git-wip-us.apache.org/repos/asf/lens/blob/97fe26d3/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITQueueNumberTests.java ---------------------------------------------------------------------- diff --git a/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITQueueNumberTests.java b/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITQueueNumberTests.java new file mode 100644 index 0000000..b7cea6b --- /dev/null +++ b/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITQueueNumberTests.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lens.regression.throttling; + +import java.lang.reflect.Method; +import java.util.*; + +import javax.ws.rs.client.WebTarget; + +import org.apache.lens.api.query.LensQuery; +import org.apache.lens.api.query.QueryHandle; +import org.apache.lens.api.query.QueryStatus; +import org.apache.lens.cube.parse.CubeQueryConfUtil; +import org.apache.lens.regression.core.constants.DriverConfig; +import org.apache.lens.regression.core.constants.QueryInventory; +import org.apache.lens.regression.core.helpers.ServiceManagerHelper; +import org.apache.lens.regression.core.testHelper.BaseTestClass; +import org.apache.lens.regression.util.Util; +import org.apache.lens.server.api.util.LensUtil; + +import org.apache.log4j.Logger; + +import org.testng.Assert; +import org.testng.annotations.*; + +public class ITQueueNumberTests extends BaseTestClass { + + private WebTarget servLens; + private String sessionHandleString; + + public static final String COST_95 = QueryInventory.getQueryFromInventory("HIVE.COST_95"); + public static final String COST_60 = QueryInventory.getQueryFromInventory("HIVE.COST_60"); + public static final String COST_30 = QueryInventory.getQueryFromInventory("HIVE.COST_30"); + public static final String COST_20 = QueryInventory.getQueryFromInventory("HIVE.COST_20"); + public static final String COST_10 = QueryInventory.getQueryFromInventory("HIVE.COST_10"); + public static final String COST_5 = QueryInventory.getQueryFromInventory("HIVE.COST_5"); + public static final String COST_3 = QueryInventory.getQueryFromInventory("HIVE.COST_3"); + public static final String COST_2 = QueryInventory.getQueryFromInventory("HIVE.COST_2"); + public static final String JDBC_QUERY1 = QueryInventory.getQueryFromInventory("JDBC.QUERY1"); + + private String hiveDriverSitePath = lens.getServerDir() + "/conf/drivers/hive/hive1/hivedriver-site.xml"; + private static Logger logger = Logger.getLogger(ITQueueNumberTests.class); + + @BeforeClass(alwaysRun = true) + public void initialize() throws Exception { + servLens = ServiceManagerHelper.init(); + logger.info("Creating a new Session"); + sessionHandleString = sHelper.openSession(lens.getCurrentDB()); + sHelper.setAndValidateParam(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false"); + } + + @BeforeMethod(alwaysRun = true) + public void setUp(Method method) throws Exception { + logger.info("Test Name: " + method.getName()); + } + + @AfterMethod(alwaysRun = true) + public void afterMethod(Method method) throws Exception { + logger.info("Test Name: " + method.getName()); + qHelper.killQuery(null, "QUEUED", "all"); + qHelper.killQuery(null, "RUNNING", "all"); + qHelper.killQuery(null, "EXECUTED", "all"); + } + + @AfterClass(alwaysRun = true) + public void closeSession() throws Exception { + logger.info("Closing Session"); + sHelper.closeSession(); + } + + + /* + * LENS-995 : Queue number shouldn't change with in the same prority + */ + + @Test(enabled = true) + public void queueNumberChangeWithInSamePriority() throws Exception { + + String longRunningQuery = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "20"); + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "1"); + String[] queries = {longRunningQuery, COST_5, COST_5, COST_3, COST_2}; + + try { + Util.changeConfig(map, hiveDriverSitePath); + lens.restart(); + + List<QueryHandle> handleList = new ArrayList<>(); + for(String query : queries){ + handleList.add((QueryHandle) qHelper.executeQuery(query).getData()); + } + + LensQuery lq1 = qHelper.getLensQuery(sessionHandleString, handleList.get(1)); + LensQuery lq2 = qHelper.getLensQuery(sessionHandleString, handleList.get(2)); + LensQuery lq3 = qHelper.getLensQuery(sessionHandleString, handleList.get(3)); + LensQuery lq4 = qHelper.getLensQuery(sessionHandleString, handleList.get(4)); + + Assert.assertEquals(lq1.getStatus().getQueueNumber().intValue(), 1); + Assert.assertEquals(lq2.getStatus().getQueueNumber().intValue(), 2); + Assert.assertEquals(lq3.getStatus().getQueueNumber().intValue(), 3); + Assert.assertEquals(lq4.getStatus().getQueueNumber().intValue(), 4); + + LensQuery lq0 = qHelper.waitForCompletion(handleList.get(0)); + + lq1 = qHelper.getLensQuery(sessionHandleString, handleList.get(1)); + lq2 = qHelper.getLensQuery(sessionHandleString, handleList.get(2)); + lq3 = qHelper.getLensQuery(sessionHandleString, handleList.get(3)); + lq4 = qHelper.getLensQuery(sessionHandleString, handleList.get(4)); + + Assert.assertEquals(lq0.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); + Assert.assertEquals(lq2.getStatus().getQueueNumber().intValue(), 1); + Assert.assertEquals(lq3.getStatus().getQueueNumber().intValue(), 2); + Assert.assertEquals(lq4.getStatus().getQueueNumber().intValue(), 3); + + lq1 = qHelper.waitForCompletion(handleList.get(1)); + + lq2 = qHelper.getLensQuery(sessionHandleString, handleList.get(2)); + lq3 = qHelper.getLensQuery(sessionHandleString, handleList.get(3)); + lq4 = qHelper.getLensQuery(sessionHandleString, handleList.get(4)); + + Assert.assertEquals(lq1.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); + Assert.assertEquals(lq3.getStatus().getQueueNumber().intValue(), 1); + Assert.assertEquals(lq4.getStatus().getQueueNumber().intValue(), 2); + + }finally { + Util.changeConfig(hiveDriverSitePath); + lens.restart(); + } + } + + + @Test(enabled = true) + public void queueNumberChangeDifferentPriority() throws Exception { + + String longRunningQuery = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "20"); + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "1"); + try { + Util.changeConfig(map, hiveDriverSitePath); + lens.restart(); + + QueryHandle q0 = (QueryHandle) qHelper.executeQuery(longRunningQuery).getData(); + QueryHandle q1 = (QueryHandle) qHelper.executeQuery(COST_20).getData(); + QueryHandle q2 = (QueryHandle) qHelper.executeQuery(COST_2).getData(); + + LensQuery normal1 = qHelper.getLensQuery(sessionHandleString, q1); + LensQuery high1 = qHelper.getLensQuery(sessionHandleString, q2); + + Assert.assertEquals(normal1.getStatus().getQueueNumber().intValue(), 2); + Assert.assertEquals(high1.getStatus().getQueueNumber().intValue(), 1); + + QueryHandle q3 = (QueryHandle) qHelper.executeQuery(COST_5).getData(); + + LensQuery high2 = qHelper.getLensQuery(sessionHandleString, q3); + high1 = qHelper.getLensQuery(sessionHandleString, q2); + normal1 = qHelper.getLensQuery(sessionHandleString, q1); + + Assert.assertEquals(normal1.getStatus().getQueueNumber().intValue(), 3); + Assert.assertEquals(high1.getStatus().getQueueNumber().intValue(), 1); + Assert.assertEquals(high2.getStatus().getQueueNumber().intValue(), 2); + + QueryHandle q4 = (QueryHandle) qHelper.executeQuery(COST_20).getData(); + + LensQuery normal2 = qHelper.getLensQuery(sessionHandleString, q4); + normal1 = qHelper.getLensQuery(sessionHandleString, q1); + high1 = qHelper.getLensQuery(sessionHandleString, q2); + high2 = qHelper.getLensQuery(sessionHandleString, q3); + + Assert.assertEquals(high1.getStatus().getQueueNumber().intValue(), 1); + Assert.assertEquals(high2.getStatus().getQueueNumber().intValue(), 2); + Assert.assertEquals(normal1.getStatus().getQueueNumber().intValue(), 3); + Assert.assertEquals(normal2.getStatus().getQueueNumber().intValue(), 4); + + }finally { + Util.changeConfig(hiveDriverSitePath); + lens.restart(); + } + } + + + @Test(enabled = true) + public void queueNumberChangeDifferentPriorityWithJdbc() throws Exception { + + String longRunningQuery = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "20"); + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "1"); + List<QueryHandle> handleList = new ArrayList<>(); + + try { + Util.changeConfig(map, hiveDriverSitePath); + lens.restart(); + + String[] queries = {COST_20, COST_2, COST_3, COST_60, COST_5, COST_10, COST_3}; + // Queue order is determined from priority and order in which queries are fired. + int[] queueNo = {5, 1, 2, 7, 3, 6, 4}; + + qHelper.executeQuery(longRunningQuery); + for(String query : queries){ + handleList.add((QueryHandle) qHelper.executeQuery(query).getData()); + qHelper.executeQuery(JDBC_QUERY1).getData(); + } + + List<LensQuery> lqList = new ArrayList<>(); + for(QueryHandle qh : handleList){ + lqList.add(qHelper.getLensQuery(sessionHandleString, qh)); + } + + for(int i = 0; i < lqList.size(); i++) { + Assert.assertEquals(lqList.get(i).getStatus().getQueueNumber().intValue(), queueNo[i]); + } + + }finally { + Util.changeConfig(hiveDriverSitePath); + lens.restart(); + } + } + +} http://git-wip-us.apache.org/repos/asf/lens/blob/97fe26d3/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITThrottlingTests.java ---------------------------------------------------------------------- diff --git a/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITThrottlingTests.java b/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITThrottlingTests.java new file mode 100644 index 0000000..6e34cb5 --- /dev/null +++ b/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITThrottlingTests.java @@ -0,0 +1,605 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lens.regression.throttling; + +import java.lang.reflect.Method; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.client.WebTarget; + +import org.apache.lens.api.Priority; +import org.apache.lens.api.query.LensQuery; +import org.apache.lens.api.query.QueryHandle; +import org.apache.lens.api.query.QueryStatus; +import org.apache.lens.cube.parse.CubeQueryConfUtil; +import org.apache.lens.driver.hive.HiveDriver; +import org.apache.lens.regression.core.constants.DriverConfig; +import org.apache.lens.regression.core.constants.QueryInventory; +import org.apache.lens.regression.core.helpers.ServiceManagerHelper; +import org.apache.lens.regression.core.testHelper.BaseTestClass; +import org.apache.lens.regression.util.Util; +import org.apache.lens.server.api.LensConfConstants; +import org.apache.lens.server.api.util.LensUtil; + +import org.apache.log4j.Logger; + +import org.testng.Assert; +import org.testng.annotations.*; + +public class ITThrottlingTests extends BaseTestClass { + + WebTarget servLens; + String sessionHandleString; + + public static final String SLEEP_QUERY = QueryInventory.getSleepQuery("5"); + public static final String COST_95 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "5"); + public static final String COST_60 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_60"), "5"); + public static final String COST_30 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_30"), "5"); + public static final String COST_20 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_20"), "4"); + public static final String COST_10 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_10"), "4"); + public static final String COST_5 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_5"), "3"); + public static final String JDBC_QUERY1 = QueryInventory.getQueryFromInventory("JDBC.QUERY1"); + + private static String hiveDriver = "hive/hive1"; + private final String hiveDriverConf = lens.getServerDir() + "/conf/drivers/hive/hive1/hivedriver-site.xml"; + private final String backupConfFilePath = lens.getServerDir() + "/conf/drivers/hive/hive1/backup-hivedriver-site.xml"; + + private static final long SECONDS_IN_A_MINUTE = 60; + private String session1 = null, session2 = null; + //TODO : Read queue names from property file + private static String queue1 = "dwh", queue2 = "reports"; + + private static Logger logger = Logger.getLogger(ITThrottlingTests.class); + + @BeforeClass(alwaysRun = true) + public void initialize() throws Exception { + servLens = ServiceManagerHelper.init(); + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "10", + HiveDriver.HS2_PRIORITY_RANGES, "HIGH,7,NORMAL,30,LOW,90,VERY_LOW"); + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + } + + @BeforeMethod(alwaysRun = true) + public void setUp(Method method) throws Exception { + logger.info("Test Name: " + method.getName()); + Util.runRemoteCommand("cp " + hiveDriverConf + " " + backupConfFilePath); + + sessionHandleString = sHelper.openSession(lens.getCurrentDB()); + session1 = sHelper.openSession("diff1", "diff1", lens.getCurrentDB()); + session2 = sHelper.openSession("diff2", "diff2", lens.getCurrentDB()); + sHelper.setAndValidateParam(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false"); + sHelper.setAndValidateParam(session1, CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false"); + sHelper.setAndValidateParam(session2, CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false"); + } + + @AfterMethod(alwaysRun = true) + public void afterMethod(Method method) throws Exception { + logger.info("Test Name: " + method.getName()); + qHelper.killQuery(null, "QUEUED", "all"); + qHelper.killQuery(null, "RUNNING", "all"); + qHelper.killQuery(null, "EXECUTED", "all"); + sHelper.closeSession(session1); + sHelper.closeSession(session2); + sHelper.closeSession(sessionHandleString); + + Util.runRemoteCommand("cp " + backupConfFilePath + " " + hiveDriverConf); + } + + @AfterClass(alwaysRun = true) + public void closeSession() throws Exception { + lens.restart(); + } + + + @Test(enabled = true) + public void testHiveThrottling() throws Exception { + + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "2"); + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + + List<QueryHandle> handleList = new ArrayList<>(); + handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null, session1).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null, session2).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null).getData()); + + Thread.sleep(1000); + + List<QueryStatus> statusList = new ArrayList<>(); + for(QueryHandle handle : handleList){ + statusList.add(qHelper.getQueryStatus(handle)); + } + + Assert.assertEquals(statusList.get(0).getStatus(), QueryStatus.Status.RUNNING); + Assert.assertEquals(statusList.get(1).getStatus(), QueryStatus.Status.RUNNING); + Assert.assertEquals(statusList.get(2).getStatus(), QueryStatus.Status.QUEUED); + Assert.assertEquals(statusList.get(3).getStatus(), QueryStatus.Status.QUEUED); + + qHelper.waitForCompletion(handleList.get(0)); + Thread.sleep(100); + Assert.assertEquals(qHelper.getQueryStatus(handleList.get(2)).getStatus(), QueryStatus.Status.RUNNING); + } + + + @Test(enabled = true) + public void testHiveMaxConcurrentRandomQueryIngestion() throws Exception { + + long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds + int sleepTime = 3, maxConcurrent = 4; + List<QueryHandle> handleList = new ArrayList<QueryHandle>(); + + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, + String.valueOf(maxConcurrent)); + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + + for (int i = 0; i < 5; i++) { + handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(JDBC_QUERY1).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY, null, session1).getData()); + } + + Thread.sleep(50); + + List<QueryHandle> running = null, queued = null; + for (int t = 0; t < timeToWait; t = t + sleepTime) { + + running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver); + queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver); + logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size()); + + if (running.isEmpty() && queued.isEmpty()) { + break; + } + Assert.assertTrue(running.size() <= maxConcurrent); + + if (t % 30 == 0 && t < 200) { + handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_DIM_QUERY).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.JDBC_CUBE_QUERY).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY, null, session1).getData()); + } + TimeUnit.SECONDS.sleep(sleepTime); + } + + Assert.assertTrue(running.isEmpty()); + Assert.assertTrue(queued.isEmpty()); + + for(QueryHandle q : handleList){ + LensQuery lq = qHelper.waitForCompletion(q); + Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); + } + } + + @Test(enabled = true) + public void testProrityMaxConcurrent() throws Exception { + + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5", + DriverConfig.PRIORITY_MAX_CONCURRENT, "HIGH=2,VERY_LOW=1", HiveDriver.HS2_PRIORITY_RANGES, + "HIGH,7,NORMAL,30,LOW,90,VERY_LOW"); + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + + /* First 3 are high priority queries, 2 of them will go to RUNNING, 3rd will be queued as there is a + threshold of 2 on HIGH priority. cost_95 queries are very_low priority ones, hence 1st will go to RUNNING, + other 1 is queued. cost_20 and cost_60 goes to running as they are normal and low priority queries and there + is no limit set for low and normal priority.Last cost_60 query goes to queue as, RUNNING query on this + driver has reached max concurrent threshold. + */ + + String[] queries = {COST_5, COST_5, COST_5, COST_95, COST_95, COST_20, COST_60, COST_60}; + QueryStatus.Status[] expectedStatus = {QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING, + QueryStatus.Status.QUEUED, QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, + QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, }; + + List<QueryHandle> handleList = new ArrayList<>(); + for (String query : queries){ + handleList.add((QueryHandle) qHelper.executeQuery(query).getData()); + } + + List<QueryStatus.Status> statusList = new ArrayList<>(); + for (QueryHandle handle : handleList){ + statusList.add(qHelper.getQueryStatus(handle).getStatus()); + } + + for (int i=0; i<statusList.size(); i++){ + Assert.assertEquals(statusList.get(i), expectedStatus[i]); + } + + qHelper.waitForCompletion(handleList.get(0)); + Assert.assertEquals(qHelper.getQueryStatus(handleList.get(2)).getStatus(), QueryStatus.Status.RUNNING); + } + + @Test(enabled = true) + public void prioritySumMoreThanMaxConcurrent() throws Exception { + + long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds + int sleepTime = 5, maxConcurrent = 5, lowConCurrent = 2, veryLowConcurrent = 1, highConcurrent = 4, + normalConcurrent = 2; + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, + String.valueOf(maxConcurrent), DriverConfig.PRIORITY_MAX_CONCURRENT, + "LOW=" + String.valueOf(lowConCurrent) + ",VERY_LOW=" + String.valueOf(veryLowConcurrent) + + ",NORMAL=" + String.valueOf(normalConcurrent) + ",HIGH=" + String.valueOf(highConcurrent)); + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + + List<QueryHandle> handleList = new ArrayList<QueryHandle>(); + for (int i=1; i<=5; i++){ + handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_60, null, session1).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session2).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_5).getData()); + } + + List<QueryHandle> running=null, queued=null; + for (int t = 0; t < timeToWait; t = t + sleepTime) { + + running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver); + queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver); + logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size()); + + if (running.isEmpty() && queued.isEmpty()) { + break; + } + + Assert.assertTrue(running.size() <= maxConcurrent); + + int low = 0, veryLow = 0, high = 0, normal = 0; + for (QueryHandle qh : running) { + Priority p = qHelper.getLensQuery(sessionHandleString, qh).getPriority(); + Assert.assertNotNull(p); + switch (p) { + case HIGH: + high++; + break; + case NORMAL: + normal++; + break; + case LOW: + low++; + break; + case VERY_LOW: + veryLow++; + break; + default: + throw new Exception("Unexpected Priority"); + } + } + + Assert.assertTrue(low <= lowConCurrent); + Assert.assertTrue(veryLow <= veryLowConcurrent); + Assert.assertTrue(high <= highConcurrent); + Assert.assertTrue(normal <= normalConcurrent); + + TimeUnit.SECONDS.sleep(sleepTime); + } + + Assert.assertTrue(queued.isEmpty()); + Assert.assertTrue(running.isEmpty()); + + for (QueryHandle q: handleList){ + LensQuery lq = qHelper.waitForCompletion(q); + Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); + } + } + + + @Test(enabled = true) + public void queueMaxConcurrent() throws Exception { + + int maxConcurrent = 3, queue1Count = 1, queue2Count = 2; + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, + String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT, + queue1 + "=" + String.valueOf(queue1Count) + "," + queue2 + "=" + String.valueOf(queue2Count)); + List<QueryHandle> handleList = new ArrayList<>(); + + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + + sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1); + handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); + + sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); + handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); + + List<QueryStatus.Status> statusList = new ArrayList<>(); + for (QueryHandle handle : handleList){ + statusList.add(qHelper.getQueryStatus(handle).getStatus()); + } + + Assert.assertEquals(statusList.get(0), QueryStatus.Status.RUNNING); + Assert.assertEquals(statusList.get(1), QueryStatus.Status.QUEUED); + Assert.assertEquals(statusList.get(2), QueryStatus.Status.RUNNING); + Assert.assertEquals(statusList.get(3), QueryStatus.Status.RUNNING); + Assert.assertEquals(statusList.get(4), QueryStatus.Status.QUEUED); + + qHelper.waitForCompletion(handleList.get(0)); + Assert.assertEquals(qHelper.getQueryStatus(handleList.get(1)).getStatus(), QueryStatus.Status.RUNNING); + + qHelper.waitForCompletion(handleList.get(2)); + Assert.assertEquals(qHelper.getQueryStatus(handleList.get(4)).getStatus(), QueryStatus.Status.RUNNING); + } + + // LENS-1027 + @Test(enabled = true) + public void queueDefaultThresholdConstraint() throws Exception { + + int maxConcurrent = 5, queue1Count = 1, queue2Count = 2; + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, + String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT, + "*=" + String.valueOf(queue1Count) + "," + queue2 + "=" + String.valueOf(queue2Count)); + List<QueryHandle> handleList = new ArrayList<>(); + + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + + sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1); + handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); + + sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); + handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); + + Thread.sleep(2000); + + List<QueryStatus.Status> statusList = new ArrayList<>(); + for (QueryHandle handle : handleList){ + statusList.add(qHelper.getQueryStatus(handle).getStatus()); + } + + Assert.assertEquals(statusList.get(0), QueryStatus.Status.RUNNING); + Assert.assertEquals(statusList.get(1), QueryStatus.Status.QUEUED); + Assert.assertEquals(statusList.get(2), QueryStatus.Status.RUNNING); + Assert.assertEquals(statusList.get(3), QueryStatus.Status.RUNNING); + Assert.assertEquals(statusList.get(4), QueryStatus.Status.QUEUED); + } + + + @Test(enabled = true) + public void enableQueueThrottlingWithExistingQueuedQueries() throws Exception { + + long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds + int sleepTime = 5, maxConcurrent = 4, queue1Concurrent = 1, queue2Concurrent = 2; + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, + String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT, + queue1 + "=" + String.valueOf(queue1Concurrent) + "," + queue2 + "=" + String.valueOf(queue2Concurrent)); + + sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1); + sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); + + List<QueryHandle> handleList = new ArrayList<QueryHandle>(); + for (int i = 1; i <= 3; i++) { + handleList.add((QueryHandle)qHelper.executeQuery(COST_95).getData()); + handleList.add((QueryHandle)qHelper.executeQuery(COST_20, "", session1).getData()); + handleList.add((QueryHandle)qHelper.executeQuery(COST_95, "", session2).getData()); + } + + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + + for (int i = 1; i <= 2; i++) { + handleList.add((QueryHandle)qHelper.executeQuery(COST_95).getData()); + handleList.add((QueryHandle)qHelper.executeQuery(COST_20, "", session1).getData()); + handleList.add((QueryHandle)qHelper.executeQuery(COST_95, "", session2).getData()); + } + + List<QueryHandle> running = null, queued = null; + for (int t = 0; t < timeToWait; t = t + sleepTime) { + + running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver); + queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver); + logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size()); + + if (running.isEmpty() && queued.isEmpty()) { + break; + } + Assert.assertTrue(running.size() <= maxConcurrent, "running query count is not less than max concurrent set" + + "running-count : " + running.size() + ", max-count : " + maxConcurrent); + + int queue1Count = 0, queue2Count = 0; + for (QueryHandle qh : running) { + String queue = qHelper.getLensQuery(sessionHandleString, qh).getQueryConf().getProperties() + .get("mapreduce.job.queuename"); + Assert.assertNotNull(queue); + + if (queue.equals(queue1)) { + queue1Count++; + } else if (queue.equals(queue2)) { + queue2Count++; + } + } + + Assert.assertTrue(queue1Count <= queue1Concurrent, "queue1 count : " + queue1Count); + Assert.assertTrue(queue2Count <= queue2Concurrent, "queue2 count : " + queue2Count); + TimeUnit.SECONDS.sleep(sleepTime); + } + + Assert.assertTrue(running.isEmpty()); + Assert.assertTrue(queued.isEmpty()); + + for(QueryHandle q: handleList){ + LensQuery lq = qHelper.waitForCompletion(q); + Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); + } + } + + + @Test(enabled = true) + public void queueAndPriorityMaxConcurrent() throws Exception { + + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5", + DriverConfig.PRIORITY_MAX_CONCURRENT, "LOW=2,VERY_LOW=1", + DriverConfig.QUEUE_MAX_CONCURRENT, queue1 + "=1," + queue2 + "=2"); + + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + + sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1); + sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); + + QueryStatus.Status[] expectedStatus = {QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, + QueryStatus.Status.QUEUED, QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING, + QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, QueryStatus.Status.QUEUED, }; + + List<QueryHandle> handleList = new ArrayList<>(); + handleList.add((QueryHandle) qHelper.executeQuery(COST_95, null, session1).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session1).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_95, null, session2).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_60, null, session2).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_5, null, session2).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session2).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_5, null, session2).getData()); + + List<QueryStatus> statusList = new ArrayList<>(); + for(QueryHandle handle: handleList){ + statusList.add(qHelper.getQueryStatus(handle)); + } + + for(int i=0; i<expectedStatus.length; i++){ + Assert.assertEquals(statusList.get(i).getStatus(), expectedStatus[i], "failed : query-" + i); + } + } + + + @Test(enabled = true) + public void queueAndPriorityMaxConcurrentMany() throws Exception { + + long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds + int sleepTime = 5, maxConcurrent = 5, queue1Concurrent = 1, queue2Concurrent = 2, priority1 = 2, priority2 = 1; + + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, + String.valueOf(maxConcurrent), DriverConfig.PRIORITY_MAX_CONCURRENT, + "HIGH=" + String.valueOf(priority1) + ",NORMAL=" + String.valueOf(priority2), + DriverConfig.QUEUE_MAX_CONCURRENT, + queue1 + "=" + String.valueOf(queue1Concurrent) + "," + queue2 + "=" + String.valueOf(queue2Concurrent)); + + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + + sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1); + sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); + + List<QueryHandle> handleList = new ArrayList<QueryHandle>(); + for (int i = 1; i <= 3; i++) { + handleList.add((QueryHandle) qHelper.executeQuery(COST_5).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_20, "", session1).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_60, "", session2).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_20).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_95, "", session1).getData()); + handleList.add((QueryHandle) qHelper.executeQuery(COST_5, "", session2).getData()); + } + + List<QueryHandle> running = null, queued = null; + for (int t = 0; t < timeToWait; t = t + sleepTime) { + + running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver); + queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver); + logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size()); + + if (running.isEmpty() && queued.isEmpty()) { + break; + } + + Assert.assertTrue(running.size() <= maxConcurrent); + + int pCount1 = 0, pCount2 = 0, queue1Count = 0, queue2Count = 0; + for (QueryHandle qh : running) { + Priority priority = qHelper.getLensQuery(sessionHandleString, qh).getPriority(); + String queue = qHelper.getLensQuery(sessionHandleString, qh).getQueryConf().getProperties() + .get("mapreduce.job.queuename"); + + Assert.assertNotNull(priority); + Assert.assertNotNull(queue); + + if (priority.equals(Priority.LOW)){ + pCount1++; + } else if (priority.equals(Priority.VERY_LOW)){ + pCount2++; + } + + if (queue.equals(queue1)){ + queue1Count++; + } else if (queue.equals(queue2)) { + queue2Count++; + } + } + + Assert.assertTrue(pCount1 <= priority1, "proirty-1 count : " + pCount1); + Assert.assertTrue(pCount2 <= priority2, "priority-2 count : " + pCount2); + Assert.assertTrue(queue1Count <= queue1Concurrent, "queue-1 count : " + queue1Count); + Assert.assertTrue(queue2Count <= queue2Concurrent, "queue-2 count : " + queue2Count); + + TimeUnit.SECONDS.sleep(sleepTime); + } + + Assert.assertTrue(queued.isEmpty()); + Assert.assertTrue(running.isEmpty()); + + for (QueryHandle q : handleList) { + LensQuery lq = qHelper.waitForCompletion(q); + Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); + } + } + + /* + LENS-973. Scenario is mentioned in jira + */ + + @Test(enabled = true) + public void queueConstraintFailureOnRestart() throws Exception { + + List<QueryHandle> handleList = new ArrayList<QueryHandle>(); + + HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "3", + DriverConfig.QUEUE_MAX_CONCURRENT, queue1 + "=1," + queue2 + "=3"); + Util.changeConfig(map, hiveDriverConf); + lens.restart(); + + String newSession = sHelper.openSession("user", "pwd", lens.getCurrentDB()); + sHelper.setAndValidateParam(newSession, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); + handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.SLEEP_QUERY, null, newSession).getData()); + + for(int i=0; i<2; i++){ + handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY).getData()); + } + + sHelper.closeSession(newSession); + lens.restart(); + Assert.assertFalse(qHelper.getQueryStatus(handleList.get(0)).finished()); + + for(int i=0; i<6; i++){ + handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_DIM_QUERY).getData()); + } + + for(QueryHandle handle: handleList){ + LensQuery lq = qHelper.waitForCompletion(handle); + Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/97fe26d3/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java ---------------------------------------------------------------------- diff --git a/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java b/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java deleted file mode 100644 index abf7263..0000000 --- a/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java +++ /dev/null @@ -1,604 +0,0 @@ - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.lens.regression.throttling; - -import java.lang.reflect.Method; -import java.util.*; -import java.util.concurrent.TimeUnit; - -import javax.ws.rs.client.WebTarget; - -import org.apache.lens.api.Priority; -import org.apache.lens.api.query.LensQuery; -import org.apache.lens.api.query.QueryHandle; -import org.apache.lens.api.query.QueryStatus; -import org.apache.lens.cube.parse.CubeQueryConfUtil; -import org.apache.lens.driver.hive.HiveDriver; -import org.apache.lens.regression.core.constants.DriverConfig; -import org.apache.lens.regression.core.constants.QueryInventory; -import org.apache.lens.regression.core.helpers.ServiceManagerHelper; -import org.apache.lens.regression.core.testHelper.BaseTestClass; -import org.apache.lens.regression.util.Util; -import org.apache.lens.server.api.LensConfConstants; -import org.apache.lens.server.api.util.LensUtil; - -import org.apache.log4j.Logger; - -import org.testng.Assert; -import org.testng.annotations.*; - -public class Throttling extends BaseTestClass { - - WebTarget servLens; - String sessionHandleString; - - public static final String SLEEP_QUERY = QueryInventory.getSleepQuery("5"); - public static final String COST_95 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "5"); - public static final String COST_60 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_60"), "5"); - public static final String COST_30 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_30"), "5"); - public static final String COST_20 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_20"), "4"); - public static final String COST_10 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_10"), "4"); - public static final String COST_5 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_5"), "3"); - public static final String JDBC_QUERY1 = QueryInventory.getQueryFromInventory("JDBC.QUERY1"); - - private static String hiveDriver = "hive/hive1"; - private final String hiveDriverConf = lens.getServerDir() + "/conf/drivers/hive/hive1/hivedriver-site.xml"; - private final String backupConfFilePath = lens.getServerDir() + "/conf/drivers/hive/hive1/backup-hivedriver-site.xml"; - - private static final long SECONDS_IN_A_MINUTE = 60; - private String session1 = null, session2 = null; - //TODO : Read queue names from property file - private static String queue1 = "dwh", queue2 = "reports"; - - private static Logger logger = Logger.getLogger(Throttling.class); - - @BeforeClass(alwaysRun = true) - public void initialize() throws Exception { - servLens = ServiceManagerHelper.init(); - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "10", - HiveDriver.HS2_PRIORITY_RANGES, "HIGH,7,NORMAL,30,LOW,90,VERY_LOW"); - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - } - - @BeforeMethod(alwaysRun = true) - public void setUp(Method method) throws Exception { - logger.info("Test Name: " + method.getName()); - Util.runRemoteCommand("cp " + hiveDriverConf + " " + backupConfFilePath); - - sessionHandleString = sHelper.openSession(lens.getCurrentDB()); - session1 = sHelper.openSession("diff1", "diff1", lens.getCurrentDB()); - session2 = sHelper.openSession("diff2", "diff2", lens.getCurrentDB()); - sHelper.setAndValidateParam(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false"); - sHelper.setAndValidateParam(session1, CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false"); - sHelper.setAndValidateParam(session2, CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false"); - } - - @AfterMethod(alwaysRun = true) - public void afterMethod(Method method) throws Exception { - logger.info("Test Name: " + method.getName()); - qHelper.killQuery(null, "QUEUED", "all"); - qHelper.killQuery(null, "RUNNING", "all"); - qHelper.killQuery(null, "EXECUTED", "all"); - sHelper.closeSession(session1); - sHelper.closeSession(session2); - sHelper.closeSession(sessionHandleString); - - Util.runRemoteCommand("cp " + backupConfFilePath + " " + hiveDriverConf); - } - - @AfterClass(alwaysRun = false) - public void closeSession() throws Exception { - lens.restart(); - } - - - @Test(enabled = true) - public void testHiveThrottling() throws Exception { - - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "2"); - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - - List<QueryHandle> handleList = new ArrayList<>(); - handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null, session1).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null, session2).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null).getData()); - - Thread.sleep(1000); - - List<QueryStatus> statusList = new ArrayList<>(); - for(QueryHandle handle : handleList){ - statusList.add(qHelper.getQueryStatus(handle)); - } - - Assert.assertEquals(statusList.get(0).getStatus(), QueryStatus.Status.RUNNING); - Assert.assertEquals(statusList.get(1).getStatus(), QueryStatus.Status.RUNNING); - Assert.assertEquals(statusList.get(2).getStatus(), QueryStatus.Status.QUEUED); - Assert.assertEquals(statusList.get(3).getStatus(), QueryStatus.Status.QUEUED); - - qHelper.waitForCompletion(handleList.get(0)); - Thread.sleep(100); - Assert.assertEquals(qHelper.getQueryStatus(handleList.get(2)).getStatus(), QueryStatus.Status.RUNNING); - } - - - @Test(enabled = true) - public void testHiveMaxConcurrentRandomQueryIngestion() throws Exception { - - long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds - int sleepTime = 3, maxConcurrent = 4; - List<QueryHandle> handleList = new ArrayList<QueryHandle>(); - - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, - String.valueOf(maxConcurrent)); - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - - for (int i = 0; i < 5; i++) { - handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(JDBC_QUERY1).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY, null, session1).getData()); - } - - Thread.sleep(50); - - List<QueryHandle> running = null, queued = null; - for (int t = 0; t < timeToWait; t = t + sleepTime) { - - running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver); - queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver); - logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size()); - - if (running.isEmpty() && queued.isEmpty()) { - break; - } - Assert.assertTrue(running.size() <= maxConcurrent); - - if (t % 30 == 0 && t < 200) { - handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_DIM_QUERY).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.JDBC_CUBE_QUERY).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY, null, session1).getData()); - } - TimeUnit.SECONDS.sleep(sleepTime); - } - - Assert.assertTrue(running.isEmpty()); - Assert.assertTrue(queued.isEmpty()); - - for(QueryHandle q : handleList){ - LensQuery lq = qHelper.waitForCompletion(q); - Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); - } - } - - @Test(enabled = true) - public void testProrityMaxConcurrent() throws Exception { - - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5", - DriverConfig.PRIORITY_MAX_CONCURRENT, "HIGH=2,VERY_LOW=1", HiveDriver.HS2_PRIORITY_RANGES, - "HIGH,7,NORMAL,30,LOW,90,VERY_LOW"); - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - - /* First 3 are high priority queries, 2 of them will go to RUNNING, 3rd will be queued as there is a - threshold of 2 on HIGH priority. cost_95 queries are very_low priority ones, hence 1st will go to RUNNING, - other 1 is queued. cost_20 and cost_60 goes to running as they are normal and low priority queries and there - is no limit set for low and normal priority.Last cost_60 query goes to queue as, RUNNING query on this - driver has reached max concurrent threshold. - */ - - String[] queries = {COST_5, COST_5, COST_5, COST_95, COST_95, COST_20, COST_60, COST_60}; - QueryStatus.Status[] expectedStatus = {QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING, - QueryStatus.Status.QUEUED, QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, - QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, }; - - List<QueryHandle> handleList = new ArrayList<>(); - for (String query : queries){ - handleList.add((QueryHandle) qHelper.executeQuery(query).getData()); - } - - List<QueryStatus.Status> statusList = new ArrayList<>(); - for (QueryHandle handle : handleList){ - statusList.add(qHelper.getQueryStatus(handle).getStatus()); - } - - for (int i=0; i<statusList.size(); i++){ - Assert.assertEquals(statusList.get(i), expectedStatus[i]); - } - - qHelper.waitForCompletion(handleList.get(0)); - Assert.assertEquals(qHelper.getQueryStatus(handleList.get(2)).getStatus(), QueryStatus.Status.RUNNING); - } - - @Test(enabled = true) - public void prioritySumMoreThanMaxConcurrent() throws Exception { - - long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds - int sleepTime = 5, maxConcurrent = 5, lowConCurrent = 2, veryLowConcurrent = 1, highConcurrent = 4, - normalConcurrent = 2; - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, - String.valueOf(maxConcurrent), DriverConfig.PRIORITY_MAX_CONCURRENT, - "LOW=" + String.valueOf(lowConCurrent) + ",VERY_LOW=" + String.valueOf(veryLowConcurrent) - + ",NORMAL=" + String.valueOf(normalConcurrent) + ",HIGH=" + String.valueOf(highConcurrent)); - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - - List<QueryHandle> handleList = new ArrayList<QueryHandle>(); - for (int i=1; i<=5; i++){ - handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_60, null, session1).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session2).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_5).getData()); - } - - List<QueryHandle> running=null, queued=null; - for (int t = 0; t < timeToWait; t = t + sleepTime) { - - running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver); - queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver); - logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size()); - - if (running.isEmpty() && queued.isEmpty()) { - break; - } - - Assert.assertTrue(running.size() <= maxConcurrent); - - int low = 0, veryLow = 0, high = 0, normal = 0; - for (QueryHandle qh : running) { - Priority p = qHelper.getLensQuery(sessionHandleString, qh).getPriority(); - Assert.assertNotNull(p); - switch (p) { - case HIGH: - high++; - break; - case NORMAL: - normal++; - break; - case LOW: - low++; - break; - case VERY_LOW: - veryLow++; - break; - default: - throw new Exception("Unexpected Priority"); - } - } - - Assert.assertTrue(low <= lowConCurrent); - Assert.assertTrue(veryLow <= veryLowConcurrent); - Assert.assertTrue(high <= highConcurrent); - Assert.assertTrue(normal <= normalConcurrent); - - TimeUnit.SECONDS.sleep(sleepTime); - } - - Assert.assertTrue(queued.isEmpty()); - Assert.assertTrue(running.isEmpty()); - - for (QueryHandle q: handleList){ - LensQuery lq = qHelper.waitForCompletion(q); - Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); - } - } - - - @Test(enabled = true) - public void queueMaxConcurrent() throws Exception { - - int maxConcurrent = 3, queue1Count = 1, queue2Count = 2; - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, - String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT, - queue1 + "=" + String.valueOf(queue1Count) + "," + queue2 + "=" + String.valueOf(queue2Count)); - List<QueryHandle> handleList = new ArrayList<>(); - - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - - sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1); - handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); - - sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); - handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); - - List<QueryStatus.Status> statusList = new ArrayList<>(); - for (QueryHandle handle : handleList){ - statusList.add(qHelper.getQueryStatus(handle).getStatus()); - } - - Assert.assertEquals(statusList.get(0), QueryStatus.Status.RUNNING); - Assert.assertEquals(statusList.get(1), QueryStatus.Status.QUEUED); - Assert.assertEquals(statusList.get(2), QueryStatus.Status.RUNNING); - Assert.assertEquals(statusList.get(3), QueryStatus.Status.RUNNING); - Assert.assertEquals(statusList.get(4), QueryStatus.Status.QUEUED); - - qHelper.waitForCompletion(handleList.get(0)); - Assert.assertEquals(qHelper.getQueryStatus(handleList.get(1)).getStatus(), QueryStatus.Status.RUNNING); - - qHelper.waitForCompletion(handleList.get(2)); - Assert.assertEquals(qHelper.getQueryStatus(handleList.get(4)).getStatus(), QueryStatus.Status.RUNNING); - } - - // LENS-1027 - @Test(enabled = true) - public void queueDefaultThresholdConstraint() throws Exception { - - int maxConcurrent = 5, queue1Count = 1, queue2Count = 2; - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, - String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT, - "*=" + String.valueOf(queue1Count) + "," + queue2 + "=" + String.valueOf(queue2Count)); - List<QueryHandle> handleList = new ArrayList<>(); - - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - - sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1); - handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); - - sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); - handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData()); - - Thread.sleep(2000); - - List<QueryStatus.Status> statusList = new ArrayList<>(); - for (QueryHandle handle : handleList){ - statusList.add(qHelper.getQueryStatus(handle).getStatus()); - } - - Assert.assertEquals(statusList.get(0), QueryStatus.Status.RUNNING); - Assert.assertEquals(statusList.get(1), QueryStatus.Status.QUEUED); - Assert.assertEquals(statusList.get(2), QueryStatus.Status.RUNNING); - Assert.assertEquals(statusList.get(3), QueryStatus.Status.RUNNING); - Assert.assertEquals(statusList.get(4), QueryStatus.Status.QUEUED); - } - - - @Test(enabled = true) - public void enableQueueThrottlingWithExistingQueuedQueries() throws Exception { - - long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds - int sleepTime = 5, maxConcurrent = 4, queue1Concurrent = 1, queue2Concurrent = 2; - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, - String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT, - queue1 + "=" + String.valueOf(queue1Concurrent) + "," + queue2 + "=" + String.valueOf(queue2Concurrent)); - - sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1); - sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); - - List<QueryHandle> handleList = new ArrayList<QueryHandle>(); - for (int i = 1; i <= 3; i++) { - handleList.add((QueryHandle)qHelper.executeQuery(COST_95).getData()); - handleList.add((QueryHandle)qHelper.executeQuery(COST_20, "", session1).getData()); - handleList.add((QueryHandle)qHelper.executeQuery(COST_95, "", session2).getData()); - } - - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - - for (int i = 1; i <= 2; i++) { - handleList.add((QueryHandle)qHelper.executeQuery(COST_95).getData()); - handleList.add((QueryHandle)qHelper.executeQuery(COST_20, "", session1).getData()); - handleList.add((QueryHandle)qHelper.executeQuery(COST_95, "", session2).getData()); - } - - List<QueryHandle> running = null, queued = null; - for (int t = 0; t < timeToWait; t = t + sleepTime) { - - running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver); - queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver); - logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size()); - - if (running.isEmpty() && queued.isEmpty()) { - break; - } - Assert.assertTrue(running.size() <= maxConcurrent); - - int queue1Count = 0, queue2Count = 0; - for (QueryHandle qh : running) { - String queue = qHelper.getLensQuery(sessionHandleString, qh).getQueryConf().getProperties() - .get("mapreduce.job.queuename"); - Assert.assertNotNull(queue); - - if (queue.equals(queue1)) { - queue1Count++; - } else if (queue.equals(queue2)) { - queue2Count++; - } - } - - Assert.assertTrue(queue1Count <= queue1Concurrent, "queue1 count : " + queue1Count); - Assert.assertTrue(queue2Count <= queue2Concurrent, "queue2 count : " + queue2Count); - TimeUnit.SECONDS.sleep(sleepTime); - } - - Assert.assertTrue(running.isEmpty()); - Assert.assertTrue(queued.isEmpty()); - - for(QueryHandle q: handleList){ - LensQuery lq = qHelper.waitForCompletion(q); - Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); - } - } - - - @Test(enabled = true) - public void queueAndPriorityMaxConcurrent() throws Exception { - - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5", - DriverConfig.PRIORITY_MAX_CONCURRENT, "LOW=2,VERY_LOW=1", - DriverConfig.QUEUE_MAX_CONCURRENT, queue1 + "=1," + queue2 + "=2"); - - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - - sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1); - sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); - - QueryStatus.Status[] expectedStatus = {QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, - QueryStatus.Status.QUEUED, QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING, - QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, QueryStatus.Status.QUEUED, }; - - List<QueryHandle> handleList = new ArrayList<>(); - handleList.add((QueryHandle) qHelper.executeQuery(COST_95, null, session1).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session1).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_95, null, session2).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_60, null, session2).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_5, null, session2).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session2).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_5, null, session2).getData()); - - List<QueryStatus> statusList = new ArrayList<>(); - for(QueryHandle handle: handleList){ - statusList.add(qHelper.getQueryStatus(handle)); - } - - for(int i=0; i<expectedStatus.length; i++){ - Assert.assertEquals(statusList.get(i).getStatus(), expectedStatus[i], "failed : query-" + i); - } - } - - - @Test(enabled = true) - public void queueAndPriorityMaxConcurrentMany() throws Exception { - - long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds - int sleepTime = 5, maxConcurrent = 5, queue1Concurrent = 1, queue2Concurrent = 2, priority1 = 2, priority2 = 1; - - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, - String.valueOf(maxConcurrent), DriverConfig.PRIORITY_MAX_CONCURRENT, - "HIGH=" + String.valueOf(priority1) + ",NORMAL=" + String.valueOf(priority2), - DriverConfig.QUEUE_MAX_CONCURRENT, - queue1 + "=" + String.valueOf(queue1Concurrent) + "," + queue2 + "=" + String.valueOf(queue2Concurrent)); - - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - - sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1); - sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); - - List<QueryHandle> handleList = new ArrayList<QueryHandle>(); - for (int i = 1; i <= 3; i++) { - handleList.add((QueryHandle) qHelper.executeQuery(COST_5).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_20, "", session1).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_60, "", session2).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_20).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_95, "", session1).getData()); - handleList.add((QueryHandle) qHelper.executeQuery(COST_5, "", session2).getData()); - } - - List<QueryHandle> running = null, queued = null; - for (int t = 0; t < timeToWait; t = t + sleepTime) { - - running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver); - queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver); - logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size()); - - if (running.isEmpty() && queued.isEmpty()) { - break; - } - - Assert.assertTrue(running.size() <= maxConcurrent); - - int pCount1 = 0, pCount2 = 0, queue1Count = 0, queue2Count = 0; - for (QueryHandle qh : running) { - Priority priority = qHelper.getLensQuery(sessionHandleString, qh).getPriority(); - String queue = qHelper.getLensQuery(sessionHandleString, qh).getQueryConf().getProperties() - .get("mapreduce.job.queuename"); - - Assert.assertNotNull(priority); - Assert.assertNotNull(queue); - - if (priority.equals(Priority.LOW)){ - pCount1++; - } else if (priority.equals(Priority.VERY_LOW)){ - pCount2++; - } - - if (queue.equals(queue1)){ - queue1Count++; - } else if (queue.equals(queue2)) { - queue2Count++; - } - } - - Assert.assertTrue(pCount1 <= priority1, "proirty-1 count : " + pCount1); - Assert.assertTrue(pCount2 <= priority2, "priority-2 count : " + pCount2); - Assert.assertTrue(queue1Count <= queue1Concurrent, "queue-1 count : " + queue1Count); - Assert.assertTrue(queue2Count <= queue2Concurrent, "queue-2 count : " + queue2Count); - - TimeUnit.SECONDS.sleep(sleepTime); - } - - Assert.assertTrue(queued.isEmpty()); - Assert.assertTrue(running.isEmpty()); - - for (QueryHandle q : handleList) { - LensQuery lq = qHelper.waitForCompletion(q); - Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); - } - } - - /* - LENS-973. Scenario is mentioned in jira - */ - - @Test(enabled = true) - public void queueConstraintFailureOnRestart() throws Exception { - - List<QueryHandle> handleList = new ArrayList<QueryHandle>(); - - HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "3", - DriverConfig.QUEUE_MAX_CONCURRENT, queue1 + "=1," + queue2 + "=3"); - Util.changeConfig(map, hiveDriverConf); - lens.restart(); - - String newSession = sHelper.openSession("user", "pwd", lens.getCurrentDB()); - sHelper.setAndValidateParam(newSession, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2); - handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.SLEEP_QUERY, null, newSession).getData()); - - for(int i=0; i<2; i++){ - handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY).getData()); - } - - sHelper.closeSession(newSession); - lens.restart(); - Assert.assertFalse(qHelper.getQueryStatus(handleList.get(0)).finished()); - - for(int i=0; i<6; i++){ - handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_DIM_QUERY).getData()); - } - - for(QueryHandle handle: handleList){ - LensQuery lq = qHelper.waitForCompletion(handle); - Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); - } - } -}