Github user bitblender commented on a diff in the pull request:
https://github.com/apache/drill/pull/921#discussion_r149544267
--- Diff:
exec/java-exec/src/test/java/org/apache/drill/test/TestGracefulShutdown.java ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.test;
+
+import ch.qos.logback.classic.Level;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.server.Drillbit;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.omg.PortableServer.THREAD_POLICY_ID;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+public class TestGracefulShutdown {
+
+ @BeforeClass
+ public static void setUpTestData() {
+ for( int i = 0; i < 1000; i++) {
+ setupFile(i);
+ }
+ }
+
+
+ public static final Properties WEBSERVER_CONFIGURATION = new
Properties() {
+ {
+ put(ExecConstants.HTTP_ENABLE, true);
+ }
+ };
+
+ public FixtureBuilder enableWebServer(FixtureBuilder builder) {
+ Properties props = new Properties();
+ props.putAll(WEBSERVER_CONFIGURATION);
+ builder.configBuilder.configProps(props);
+ return builder;
+ }
+
+
+ /*
+ Start multiple drillbits and then shutdown a drillbit. Query the online
+ endpoints and check if the drillbit still exists.
+ */
+ @Test
+ public void testOnlineEndPoints() throws Exception {
+
+ String[] drillbits = {"db1" ,"db2","db3", "db4", "db5", "db6"};
+ FixtureBuilder builder =
ClusterFixture.builder().withBits(drillbits).withLocalZk();
+
+
+ try ( ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+
+ Drillbit drillbit = cluster.drillbit("db2");
+ DrillbitEndpoint drillbitEndpoint =
drillbit.getRegistrationHandle().getEndPoint();
+ int grace_period =
drillbit.getContext().getConfig().getInt("drill.exec.grace_period");
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ cluster.close_drillbit("db2");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+ //wait for graceperiod
+ Thread.sleep(grace_period);
+ Collection<DrillbitEndpoint> drillbitEndpoints =
cluster.drillbit().getContext()
+ .getClusterCoordinator()
+ .getOnlineEndPoints();
+ Assert.assertFalse(drillbitEndpoints.contains(drillbitEndpoint));
+ }
+ }
+ /*
+ Test if the drillbit transitions from ONLINE state when a shutdown
+ request is initiated
+ */
+ @Test
+ public void testStateChange() throws Exception {
+
+ String[] drillbits = {"db1" ,"db2", "db3", "db4", "db5", "db6"};
+ FixtureBuilder builder =
ClusterFixture.builder().withBits(drillbits).withLocalZk();
+
+ try ( ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ Drillbit drillbit = cluster.drillbit("db2");
+ int grace_period =
drillbit.getContext().getConfig().getInt("drill.exec.grace_period");
+ DrillbitEndpoint drillbitEndpoint =
drillbit.getRegistrationHandle().getEndPoint();
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ cluster.close_drillbit("db2");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+ Thread.sleep(grace_period);
+ Collection<DrillbitEndpoint> drillbitEndpoints =
cluster.drillbit().getContext()
+ .getClusterCoordinator()
+ .getAvailableEndpoints();
+ for (DrillbitEndpoint dbEndpoint : drillbitEndpoints) {
+ if(drillbitEndpoint.getAddress().equals(dbEndpoint.getAddress())
&& drillbitEndpoint.getUserPort() == dbEndpoint.getUserPort()) {
+
assertNotEquals(dbEndpoint.getState(),DrillbitEndpoint.State.ONLINE);
+ }
+ }
+ }
+ }
+
+ /*
+ Test shutdown through RestApi
+ */
+ @Test
+ public void testRestApi() throws Exception {
+
+ String[] drillbits = {"db1" ,"db2", "db3", "db4", "db5", "db6"};
+ FixtureBuilder builder =
ClusterFixture.bareBuilder().withBits(drillbits).withLocalZk();
+ builder = enableWebServer(builder);
+ QueryBuilder.QuerySummaryFuture listener;
+ final String sql = "SELECT * FROM
dfs.`/tmp/drill-test/gracefulShutdown/` ORDER BY employee_id";
+ try ( ClusterFixture cluster = builder.build();
+ final ClientFixture client = cluster.clientFixture()) {
+ Drillbit drillbit = cluster.drillbit("db1");
+ int port =
drillbit.getContext().getConfig().getInt("drill.exec.http.port");
+ int grace_period =
drillbit.getContext().getConfig().getInt("drill.exec.grace_period");
+ listener = client.queryBuilder().sql(sql).futureSummary();
+ Thread.sleep(10000);
+ while( port < 8052) {
+ URL url = new URL("http://localhost:"+port+"/graceful_shutdown");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("POST");
+ if (conn.getResponseCode() != 200) {
+ throw new RuntimeException("Failed : HTTP error code : "
+ + conn.getResponseCode());
+ }
+ port++;
+ }
+ Thread.sleep(grace_period);
+ Collection<DrillbitEndpoint> drillbitEndpoints =
cluster.drillbit().getContext()
+ .getClusterCoordinator()
+ .getOnlineEndPoints();
+ while(!listener.isDone()) {
+ Thread.sleep(10);
+ }
+ Assert.assertTrue(listener.isDone());
+ Assert.assertEquals(1,drillbitEndpoints.size());
+ }
+ }
+
+ @Test
+ public void testShutdown() throws Exception {
+
+
+ String[] drillbits = {"db1" ,"db2", "db3", "db4", "db5", "db6"};
+ FixtureBuilder builder =
ClusterFixture.bareBuilder().withBits(drillbits).withLocalZk();
+ final String sql = "SELECT * FROM
dfs.`/tmp/drill-test/gracefulShutdown/`";
+ QueryBuilder.QuerySummaryFuture listener;
+
+ try (ClusterFixture cluster = builder.build();
+ final ClientFixture client = cluster.clientFixture()) {
+ listener = client.queryBuilder().sql(sql).futureSummary();
+ Thread.sleep(10000);
+ shutdown(cluster,"db2");
+ shutdown(cluster,"db4");
+ shutdown(cluster,"db5");
+ shutdown(cluster,"db1");
+ while(!listener.isDone()) {
+ Thread.sleep(10);
+ }
+ final QueryBuilder.QuerySummary querySummary = listener.get();
+ Assert.assertEquals( QueryState.COMPLETED,
querySummary.finalState());
+ }
+ }
+
+ public void shutdown(final ClusterFixture cluster, final String db) {
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ Thread.currentThread().setName( db);
+ cluster.close_drillbit(db);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+ }
+
+ /*
+ Test default shutdown through RestApi
+ */
+ @Test
+ public void testRestApiShutdown() throws Exception {
+
+ String[] drillbits = {"db1" ,"db2", "db3", "db4", "db5", "db6"};
+ FixtureBuilder builder =
ClusterFixture.bareBuilder().withBits(drillbits).withLocalZk();
+ builder = enableWebServer(builder);
+ QueryBuilder.QuerySummaryFuture listener;
+ final String sql = "SELECT * FROM
dfs.`/tmp/drill-test/gracefulShutdown/` ORDER BY employee_id";
+ try ( ClusterFixture cluster = builder.build();
+ final ClientFixture client = cluster.clientFixture()) {
+ Drillbit drillbit = cluster.drillbit("db1");
+ int port =
drillbit.getContext().getConfig().getInt("drill.exec.http.port");
+ int grace_period =
drillbit.getContext().getConfig().getInt("drill.exec.grace_period");
+ listener = client.queryBuilder().sql(sql).futureSummary();
+ Thread.sleep(10000);
+ while( port < 8052) {
+ URL url = new URL("http://localhost:"+port+"/shutdown");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("POST");
+ if (conn.getResponseCode() != 200) {
+ throw new RuntimeException("Failed : HTTP error code : "
+ + conn.getResponseCode());
+ }
+ port++;
+ }
+ Thread.sleep(grace_period);
+ Thread.sleep(5000);
+ Collection<DrillbitEndpoint> drillbitEndpoints =
cluster.drillbit().getContext()
+ .getClusterCoordinator().getAvailableEndpoints();
+// .getOnlineEndPoints();
+ while(!listener.isDone()) {
+ Thread.sleep(10);
+ }
+ Assert.assertTrue(listener.isDone());
+ Assert.assertEquals(1,drillbitEndpoints.size());
+ }
+ }
+
+
+ /*
+ Test putting drillbit to quiescent mode through RestApi
+ */
+ @Test
+ public void testRestApiQuiescentMode() throws Exception {
+
+ String[] drillbits = {"db1" ,"db2", "db3", "db4", "db5", "db6"};
+ FixtureBuilder builder =
ClusterFixture.bareBuilder().withBits(drillbits).withLocalZk();
+ builder = enableWebServer(builder);
+ QueryBuilder.QuerySummaryFuture listener;
+ final String sql = "SELECT * FROM
dfs.`/tmp/drill-test/gracefulShutdown/` ORDER BY employee_id";
+ try ( ClusterFixture cluster = builder.build();
+ final ClientFixture client = cluster.clientFixture()) {
+ Drillbit drillbit = cluster.drillbit("db1");
+ DrillbitEndpoint drillbitEndpoint =
drillbit.getRegistrationHandle().getEndPoint();
+ int port =
drillbit.getContext().getConfig().getInt("drill.exec.http.port");
+ int grace_period =
drillbit.getContext().getConfig().getInt("drill.exec.grace_period");
+ listener = client.queryBuilder().sql(sql).futureSummary();
+ Thread.sleep(10000);
+ URL url = new URL("http://localhost:"+port+"/quiescent");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("POST");
+ if (conn.getResponseCode() != 200) {
+ throw new RuntimeException("Failed : HTTP error code : "
+ + conn.getResponseCode());
+ }
+ Thread.sleep(grace_period);
+ Collection<DrillbitEndpoint> drillbitEndpoints =
cluster.drillbit().getContext()
+ .getClusterCoordinator().getOnlineEndPoints();
+ while(!listener.isDone()) {
+ Thread.sleep(10);
+ }
+
+ Assert.assertTrue(listener.isDone());
+ Assert.assertFalse(drillbitEndpoints.contains(drillbitEndpoint));
+ }
+ }
+
+ private static void setupFile(int file_num) {
+ File destFile = new File(
"/tmp/drill-test/gracefulShutdown/employee"+file_num+".json" );
+ destFile.getParentFile().mkdirs();
+ try (PrintWriter out = new PrintWriter(new FileWriter(destFile))) {
+ out.println("{\"employee_id\":1,\"full_name\":\"Sheri
Nowmer\",\"first_name\":\"Sheri\",\"last_name\":\"Nowmer\",\"position_id\":1,\"position_title\":\"President\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1961-08-26\",\"hire_date\":\"1994-12-01
00:00:00.0\",\"end_date\":null,\"salary\":80000.0000,\"supervisor_id\":0,\"education_level\":\"Graduate
Degree\",\"marital_status\":\"S\",\"gender\":\"F\",\"management_role\":\"Senior
Management\"}\n" +
+ "{\"employee_id\":2,\"full_name\":\"Derrick
Whelply\",\"first_name\":\"Derrick\",\"last_name\":\"Whelply\",\"position_id\":2,\"position_title\":\"VP
Country
Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1915-07-03\",\"hire_date\":\"1994-12-01
00:00:00.0\",\"end_date\":null,\"salary\":40000.0000,\"supervisor_id\":1,\"education_level\":\"Graduate
Degree\",\"marital_status\":\"M\",\"gender\":\"M\",\"management_role\":\"Senior
Management\"}\n" +
+ "{\"employee_id\":4,\"full_name\":\"Michael
Spence\",\"first_name\":\"Michael\",\"last_name\":\"Spence\",\"position_id\":2,\"position_title\":\"VP
Country
Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1969-06-20\",\"hire_date\":\"1998-01-01
00:00:00.0\",\"end_date\":null,\"salary\":40000.0000,\"supervisor_id\":1,\"education_level\":\"Graduate
Degree\",\"marital_status\":\"S\",\"gender\":\"M\",\"management_role\":\"Senior
Management\"}\n" +
+ "{\"employee_id\":5,\"full_name\":\"Maya
Gutierrez\",\"first_name\":\"Maya\",\"last_name\":\"Gutierrez\",\"position_id\":2,\"position_title\":\"VP
Country
Manager\",\"store_id\":0,\"department_id\":1,\"birth_date\":\"1951-05-10\",\"hire_date\":\"1998-01-01
00:00:00.0\",\"end_date\":null,\"salary\":35000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors
Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior
Management\"}\n" +
+ "{\"employee_id\":6,\"full_name\":\"Roberta
Damstra\",\"first_name\":\"Roberta\",\"last_name\":\"Damstra\",\"position_id\":3,\"position_title\":\"VP
Information
Systems\",\"store_id\":0,\"department_id\":2,\"birth_date\":\"1942-10-08\",\"hire_date\":\"1994-12-01
00:00:00.0\",\"end_date\":null,\"salary\":25000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors
Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior
Management\"}\n" +
+ "{\"employee_id\":7,\"full_name\":\"Rebecca
Kanagaki\",\"first_name\":\"Rebecca\",\"last_name\":\"Kanagaki\",\"position_id\":4,\"position_title\":\"VP
Human
Resources\",\"store_id\":0,\"department_id\":3,\"birth_date\":\"1949-03-27\",\"hire_date\":\"1994-12-01
00:00:00.0\",\"end_date\":null,\"salary\":15000.0000,\"supervisor_id\":1,\"education_level\":\"Bachelors
Degree\",\"marital_status\":\"M\",\"gender\":\"F\",\"management_role\":\"Senior
Management\"}\n");
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void cleanUpTestData() {
+ try {
+ File testData = new File("/tmp/drill-test/gracefulShutdown/");
+ FileUtils.cleanDirectory(testData);
--- End diff --
Tim's pending changes will make it easy to deal with but meanwhile you can
use Files.createTempFile()
---