This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 541380d DRILL-7983: Add a REST API to support the get running or
completed profiles (#2353)
541380d is described below
commit 541380db4e863bc908c7debb2af19edc97f5d0b7
Author: CuteKittyhoho <[email protected]>
AuthorDate: Thu Dec 9 16:55:03 2021 +0800
DRILL-7983: Add a REST API to support the get running or completed profiles
(#2353)
---
.../exec/server/rest/profile/ProfileResources.java | 145 +++++++++++++++-----
.../drill/exec/server/rest/TestQueryProfiles.java | 148 +++++++++++++++++++++
2 files changed, 261 insertions(+), 32 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
index 127b2bb..391d16e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -36,6 +36,7 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
@@ -217,15 +218,37 @@ public class ProfileResources {
}
@XmlRootElement
- public class QProfiles {
- private final List<ProfileInfo> runningQueries;
- private final List<ProfileInfo> finishedQueries;
+ public class QProfilesBase {
private final List<String> errors;
+ public QProfilesBase(List<String> errors) {
+ this.errors = errors;
+ }
+
+ public List<String> getErrors() {
+ return errors;
+ }
+
+ public int getMaxFetchedQueries() {
+ return
work.getContext().getConfig().getInt(ExecConstants.HTTP_MAX_PROFILES);
+ }
+
+ public String getQueriesPerPage() {
+ List<Integer> queriesPerPageOptions =
work.getContext().getConfig().getIntList(ExecConstants.HTTP_PROFILES_PER_PAGE);
+ Collections.sort(queriesPerPageOptions);
+ return Joiner.on(",").join(queriesPerPageOptions);
+ }
+ }
+
+ @XmlRootElement
+ public class QProfiles extends QProfilesBase {
+ private final List<ProfileInfo> runningQueries;
+ private final List<ProfileInfo> finishedQueries;
+
public QProfiles(List<ProfileInfo> runningQueries, List<ProfileInfo>
finishedQueries, List<String> errors) {
+ super(errors);
this.runningQueries = runningQueries;
this.finishedQueries = finishedQueries;
- this.errors = errors;
}
public List<ProfileInfo> getRunningQueries() {
@@ -235,18 +258,34 @@ public class ProfileResources {
public List<ProfileInfo> getFinishedQueries() {
return finishedQueries;
}
+ }
- public int getMaxFetchedQueries() {
- return
work.getContext().getConfig().getInt(ExecConstants.HTTP_MAX_PROFILES);
+ @XmlRootElement
+ public class QProfilesRunning extends QProfilesBase {
+ private final List<ProfileInfo> runningQueries;
+
+ public QProfilesRunning(List<ProfileInfo> runningQueries,List<String>
errors) {
+ super(errors);
+ this.runningQueries = runningQueries;
}
- public String getQueriesPerPage() {
- List<Integer> queriesPerPageOptions =
work.getContext().getConfig().getIntList(ExecConstants.HTTP_PROFILES_PER_PAGE);
- Collections.sort(queriesPerPageOptions);
- return Joiner.on(",").join(queriesPerPageOptions);
+ public List<ProfileInfo> getRunningQueries() {
+ return runningQueries;
}
+ }
- public List<String> getErrors() { return errors; }
+ @XmlRootElement
+ public class QProfilesCompleted extends QProfilesBase {
+ private final List<ProfileInfo> finishedQueries;
+
+ public QProfilesCompleted(List<ProfileInfo> finishedQueries, List<String>
errors) {
+ super(errors);
+ this.finishedQueries = finishedQueries;
+ }
+
+ public List<ProfileInfo> getFinishedQueries() {
+ return finishedQueries;
+ }
}
//max Param to cap listing of profiles
@@ -259,13 +298,43 @@ public class ProfileResources {
@Path("/profiles.json")
@Produces(MediaType.APPLICATION_JSON)
public Response getProfilesJSON(@Context UriInfo uriInfo) {
+ QProfilesRunning running_results =
(QProfilesRunning)getRunningProfilesJSON(uriInfo).getEntity();
+ QProfilesCompleted completed_results =
(QProfilesCompleted)getCompletedProfilesJSON(uriInfo).getEntity();
+ final List<String> total_errors = Lists.newArrayList();
+ total_errors.addAll(running_results.getErrors());
+ total_errors.addAll(completed_results.getErrors());
+
+ QProfiles final_results = new QProfiles(running_results.runningQueries,
completed_results.finishedQueries, total_errors);
+ return total_errors.size() == 0
+ ? Response.ok().entity(final_results).build()
+ : Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(final_results)
+ .build();
+ }
+
+ @GET
+ @Path("/profiles/json")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getSpecificJSON(@Context UriInfo uriInfo,
@QueryParam("status") String status) {
+ switch (status) {
+ case "running":
+ return getRunningProfilesJSON(uriInfo);
+ case "completed":
+ return getCompletedProfilesJSON(uriInfo);
+ case "all":
+ default:
+ return getProfilesJSON(uriInfo);
+ }
+ }
+
+ @GET
+ @Path("/profiles/running.json")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getRunningProfilesJSON(@Context UriInfo uriInfo) {
try {
final QueryProfileStoreContext profileStoreContext =
work.getContext().getProfileStoreContext();
- final PersistentStore<QueryProfile> completed =
profileStoreContext.getCompletedProfileStore();
final TransientStore<QueryInfo> running =
profileStoreContext.getRunningProfileStore();
-
final List<String> errors = Lists.newArrayList();
-
final List<ProfileInfo> runningQueries = Lists.newArrayList();
final Iterator<Map.Entry<String, QueryInfo>> runningEntries =
running.entries();
@@ -276,22 +345,41 @@ public class ProfileResources {
if (principal.canManageProfileOf(profile.getUser())) {
runningQueries.add(
new ProfileInfo(work.getContext().getConfig(),
- runningEntry.getKey(), profile.getStart(),
System.currentTimeMillis(),
- profile.getForeman().getAddress(), profile.getQuery(),
+ runningEntry.getKey(), profile.getStart(),
+ System.currentTimeMillis(),
profile.getForeman().getAddress(),
+ profile.getQuery(),
ProfileUtil.getQueryStateDisplayName(profile.getState()),
- profile.getUser(), profile.getTotalCost(),
profile.getQueueName()));
+ profile.getUser(), profile.getTotalCost(),
+ profile.getQueueName()));
}
} catch (Exception e) {
errors.add(e.getMessage());
logger.error("Error getting running query info.", e);
}
}
-
Collections.sort(runningQueries, Collections.reverseOrder());
+ QProfilesRunning rProf = new QProfilesRunning(runningQueries, errors);
+ return errors.size() == 0
+ ? Response.ok().entity(rProf).build()
+ : Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(rProf)
+ .build();
+ } catch (Exception e) {
+ throw UserException.resourceError(e).message("Failed to get running
profiles from ephemeral store.").build(logger);
+ }
+ }
+ @GET
+ @Path("/profiles/completed.json")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getCompletedProfilesJSON(@Context UriInfo uriInfo) {
+ try {
+ final QueryProfileStoreContext profileStoreContext =
work.getContext().getProfileStoreContext();
+ final PersistentStore<QueryProfile> completed =
profileStoreContext.getCompletedProfileStore();
+ final List<String> errors = Lists.newArrayList();
final List<ProfileInfo> finishedQueries = Lists.newArrayList();
- //Defining #Profiles to load
+ // Defining #Profiles to load
int maxProfilesToLoad =
work.getContext().getConfig().getInt(ExecConstants.HTTP_MAX_PROFILES);
String maxProfilesParams =
uriInfo.getQueryParameters().getFirst(MAX_QPROFILES_PARAM);
if (maxProfilesParams != null && !maxProfilesParams.isEmpty()) {
@@ -299,7 +387,6 @@ public class ProfileResources {
}
final Iterator<Map.Entry<String, QueryProfile>> range =
completed.getRange(0, maxProfilesToLoad);
-
while (range.hasNext()) {
try {
final Map.Entry<String, QueryProfile> profileEntry = range.next();
@@ -317,21 +404,15 @@ public class ProfileResources {
logger.error("Error getting finished query profile.", e);
}
}
-
Collections.sort(finishedQueries, Collections.reverseOrder());
-
-
- QProfiles qProf = new QProfiles(runningQueries, finishedQueries, errors);
-
+ QProfilesCompleted cProf = new QProfilesCompleted(finishedQueries,
errors);
return errors.size() == 0
- ? Response.ok().entity(qProf).build()
- : Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(qProf)
- .build();
+ ? Response.ok().entity(cProf).build()
+ : Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(cProf)
+ .build();
} catch (Exception e) {
- throw UserException.resourceError(e)
- .message("Failed to get profiles from persistent or ephemeral store.")
- .build(logger);
+ throw UserException.resourceError(e).message("Failed to get completed
profiles from persistent store.").build(logger);
}
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryProfiles.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryProfiles.java
new file mode 100644
index 0000000..f7e15d1
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestQueryProfiles.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.server.rest;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared.QueryType;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class TestQueryProfiles extends ClusterTest {
+
+ private static final MediaType JSON_MEDIA_TYPE =
MediaType.parse("application/json");
+ private static final int TIMEOUT = 3000; // for debugging
+ private static String[] SQL = new String[5];
+ private static int portNumber;
+
+ private final OkHttpClient httpClient = new OkHttpClient.Builder()
+ .connectTimeout(TIMEOUT, TimeUnit.SECONDS)
+ .writeTimeout(TIMEOUT, TimeUnit.SECONDS)
+ .readTimeout(TIMEOUT, TimeUnit.SECONDS).build();
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterFixtureBuilder builder = new ClusterFixtureBuilder(dirTestWatcher)
+ .configProperty(ExecConstants.HTTP_ENABLE, true)
+ .configProperty(ExecConstants.HTTP_PORT_HUNT, true);
+ startCluster(builder);
+ portNumber = cluster.drillbit().getWebServerPort();
+ }
+
+ @Test
+ public void testAdorableQuery() throws IOException {
+ String sql = "SELECT * FROM cp.`employee.json` LIMIT 20";
+ SQL[0] = sql;
+ QueryWrapper query = new QueryWrapper(sql, QueryType.SQL.name(), "10",
null, null, null);
+ assertEquals(200, runQuery(query));
+ }
+
+ @Test
+ public void testBadQuery() throws IOException {
+ String sql = "SELECT * FROM cp.`employee123.json` LIMIT 20";
+ SQL[1] = sql;
+ QueryWrapper query = new QueryWrapper(sql, QueryType.SQL.name(), null,
null, null, null);
+ int code = runQuery(query);
+ assertEquals(200, code);
+ }
+
+ @Test
+ public void testCompletedProfiles() throws Exception {
+ String url = String.format("http://localhost:%d/profiles/completed.json",
portNumber);
+ Request request = new Request.Builder().url(url).build();
+ try (Response response = httpClient.newCall(request).execute()) {
+ String respon_body = response.body().string();
+ JSONObject json_data = (JSONObject) new JSONParser().parse(respon_body);
+ JSONArray finishedQueries = (JSONArray) json_data.get("finishedQueries");
+ JSONObject firstData = (JSONObject) finishedQueries.get(0);
+ JSONObject secondData = (JSONObject) finishedQueries.get(1);
+
+ assertEquals(2, finishedQueries.size());
+ assertEquals(SQL[1], firstData.get("query").toString());
+ assertEquals("Failed", firstData.get("state").toString());
+ assertEquals(SQL[0], secondData.get("query").toString());
+ assertEquals("Succeeded", secondData.get("state").toString());
+ }
+ }
+
+ @Test
+ public void testQueryProfiles() throws Exception {
+ String url = String.format("http://localhost:%d/profiles.json",
portNumber);
+ Request request = new Request.Builder().url(url).build();
+ try (Response response = httpClient.newCall(request).execute()) {
+ String respon_body = response.body().string();
+ JSONObject json_data = (JSONObject) new JSONParser().parse(respon_body);
+ JSONArray finishedQueries = (JSONArray) json_data.get("finishedQueries");
+ JSONObject firstData = (JSONObject) finishedQueries.get(0);
+ JSONObject secondData = (JSONObject) finishedQueries.get(1);
+
+ assertEquals(5, json_data.size());
+ assertEquals("[]", json_data.get("runningQueries").toString());
+ assertEquals(2, finishedQueries.size());
+ assertEquals(SQL[1], firstData.get("query").toString());
+ assertEquals("Failed", firstData.get("state").toString());
+ assertEquals(SQL[0], secondData.get("query").toString());
+ assertEquals("Succeeded", secondData.get("state").toString());
+ }
+ }
+
+ @Test
+ public void testRunningProfiles() throws Exception {
+ String url = String.format("http://localhost:%d/profiles/running.json",
portNumber);
+ Request request = new Request.Builder().url(url).build();
+ try (Response response = httpClient.newCall(request).execute()) {
+ String respon_body = response.body().string();
+ JSONObject json_data = (JSONObject) new JSONParser().parse(respon_body);
+ assertEquals(4, json_data.size());
+ assertEquals("[]", json_data.get("runningQueries").toString());
+ }
+ }
+
+ private int runQuery(QueryWrapper query) throws IOException {
+ ObjectWriter writer = mapper.writerFor(QueryWrapper.class);
+ String json = writer.writeValueAsString(query);
+ String url = String.format("http://localhost:%d/query.json", portNumber);
+ Request request = new
Request.Builder().url(url).post(RequestBody.create(json,
JSON_MEDIA_TYPE)).build();
+ try (Response response = httpClient.newCall(request).execute()) {
+ return response.code();
+ }
+ }
+}