This is an automated email from the ASF dual-hosted git repository. xianjingfeng pushed a commit to branch issue_864 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 4e08e9507c6106e4eeb4b34334f57b51eb6559a0 Author: xianjingfeng <[email protected]> AuthorDate: Wed Jun 7 18:09:50 2023 +0800 Introduce Jersey to strengthen REST API --- common/pom.xml | 29 ++++++ .../org/apache/uniffle/common/web/JettyServer.java | 35 ++++--- .../common/web/resource/MetricResource.java | 94 +++++++++++++++++++ .../web/resource/PrometheusMetricResource.java | 60 ++++++++++++ .../apache/uniffle/common/metrics/TestUtils.java | 1 + .../uniffle/coordinator/CoordinatorServer.java | 54 +++-------- .../coordinator/web/resource/APIResource.java | 32 +++++++ .../coordinator/web/resource/ServerResource.java | 104 +++++++++++++++++++++ .../coordinator/web/servlet/BaseServlet.java | 84 ----------------- .../web/servlet/CancelDecommissionServlet.java | 50 ---------- .../web/servlet/DecommissionServlet.java | 50 ---------- .../coordinator/web/servlet/NodesServlet.java | 56 ----------- pom.xml | 44 ++++++++- 13 files changed, 396 insertions(+), 297 deletions(-) diff --git a/common/pom.xml b/common/pom.xml index daf590b9..50ce7f1e 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -145,6 +145,35 @@ <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> </dependency> + <dependency> + <groupId>org.glassfish.jersey.inject</groupId> + <artifactId>jersey-hk2</artifactId> + <version>2.39.1</version> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-server</artifactId> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>jersey-container-jetty-servlet</artifactId> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.media</groupId> + <artifactId>jersey-media-json-jackson</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </dependency> </dependencies> <build> diff --git a/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java b/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java index 0065b0a0..aa28e789 100644 --- a/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java +++ b/common/src/main/java/org/apache/uniffle/common/web/JettyServer.java @@ -18,14 +18,15 @@ package org.apache.uniffle.common.web; import java.io.FileNotFoundException; -import java.net.BindException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.servlet.Servlet; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.server.HttpConfiguration; @@ -39,6 +40,8 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.ExecutorThreadPool; import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; +import org.glassfish.jersey.server.ServerProperties; +import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +56,8 @@ public class JettyServer { private Server server; private ServletContextHandler servletContextHandler; private int httpPort; + private ServletHolder servletHolder; + private Set<String> reourcePackages = new HashSet<>(); public JettyServer(RssBaseConf conf) throws FileNotFoundException { createServer(conf); @@ -73,17 +78,11 @@ public class JettyServer { conf.getLong(RssBaseConf.JETTY_HTTP_IDLE_TIMEOUT)); setRootServletHandler(); - if (conf.getBoolean(RssBaseConf.JETTY_SSL_ENABLE)) { addHttpsConnector(httpConfig, conf); } } - public void addServlet(Servlet servlet, String pathSpec) { - servletContextHandler.addServlet(new ServletHolder(servlet), pathSpec); - server.setHandler(servletContextHandler); - } - private ExecutorThreadPool createThreadPool(RssBaseConf conf) { int corePoolSize = conf.getInteger(RssBaseConf.JETTY_CORE_POOL_SIZE); int maxPoolSize = conf.getInteger(RssBaseConf.JETTY_MAX_POOL_SIZE); @@ -133,6 +132,20 @@ public class JettyServer { servletContextHandler = new ServletContextHandler(); servletContextHandler.setContextPath("/"); server.setHandler(servletContextHandler); + servletHolder = servletContextHandler.addServlet(ServletContainer.class, "/*"); + } + + public void addResourcePackages(String... packages) { + reourcePackages.addAll(Arrays.asList(packages)); + servletHolder.setInitParameter(ServerProperties.PROVIDER_PACKAGES, String.join(",", reourcePackages)); + } + + public void registerInstance(Class<?> clazz, Object instance) { + registerInstance(clazz.getCanonicalName(), instance); + } + + public void registerInstance(String name, Object instance) { + servletContextHandler.setAttribute(name, instance); } public Server getServer() { @@ -142,7 +155,7 @@ public class JettyServer { public void start() throws Exception { try { server.start(); - } catch (BindException e) { + } catch (Exception e) { ExitUtils.terminate(1, "Fail to start jetty http server", e, LOG); } LOG.info("Jetty http server started, listening on port {}", httpPort); @@ -151,8 +164,4 @@ public class JettyServer { public void stop() throws Exception { server.stop(); } - - public ServletContextHandler getServletContextHandler() { - return this.servletContextHandler; - } } diff --git a/common/src/main/java/org/apache/uniffle/common/web/resource/MetricResource.java b/common/src/main/java/org/apache/uniffle/common/web/resource/MetricResource.java new file mode 100644 index 00000000..bb857dbb --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/web/resource/MetricResource.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.web.resource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; + +@Path("/metrics") +public class MetricResource { + @Context + private HttpServletRequest httpRequest; + + @Context + protected ServletContext servletContext; + + @GET + @Path("/{type}") + @Produces({ MediaType.APPLICATION_JSON }) + public MetricsJsonObj metrics(@PathParam("type") String type) { + Enumeration<Collector.MetricFamilySamples> mfs = + getCollectorRegistry(type).filteredMetricFamilySamples(this.parse(httpRequest)); + List<Collector.MetricFamilySamples.Sample> metrics = new LinkedList<>(); + while (mfs.hasMoreElements()) { + Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement(); + metrics.addAll(metricFamilySamples.samples); + } + return new MetricsJsonObj(metrics, System.currentTimeMillis()); + } + + private CollectorRegistry getCollectorRegistry(String type) { + CollectorRegistry registry = (CollectorRegistry) servletContext.getAttribute( + CollectorRegistry.class.getCanonicalName() + "#" + type); + if (registry == null) { + throw new RuntimeException(String.format("Metric type[%s] not supported", type)); + } + return registry; + } + + private static class MetricsJsonObj { + + private final List<Collector.MetricFamilySamples.Sample> metrics; + private final long timeStamp; + + MetricsJsonObj(List<Collector.MetricFamilySamples.Sample> metrics, long timeStamp) { + this.metrics = metrics; + this.timeStamp = timeStamp; + } + + public List<Collector.MetricFamilySamples.Sample> getMetrics() { + return metrics; + } + + public long getTimeStamp() { + return timeStamp; + } + + } + + private Set<String> parse(HttpServletRequest req) { + String[] includedParam = req.getParameterValues("name[]"); + return includedParam == null ? Collections.emptySet() : new HashSet<>(Arrays.asList(includedParam)); + } +} diff --git a/common/src/main/java/org/apache/uniffle/common/web/resource/PrometheusMetricResource.java b/common/src/main/java/org/apache/uniffle/common/web/resource/PrometheusMetricResource.java new file mode 100644 index 00000000..446c6cea --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/web/resource/PrometheusMetricResource.java @@ -0,0 +1,60 @@ +package org.apache.uniffle.common.web.resource; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Context; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.common.TextFormat; + +@Path("/prometheus/metrics") +public class PrometheusMetricResource { + @Context + private HttpServletRequest httpRequest; + @Context + private HttpServletResponse httpServletResponse; + + @Context + protected ServletContext servletContext; + + @GET + @Path("/{type}") + public String metrics(@PathParam("type") String type) throws IOException { + httpServletResponse.setStatus(200); + httpServletResponse.setContentType("text/plain; version=0.0.4; charset=utf-8"); + Writer writer = new BufferedWriter(httpServletResponse.getWriter()); + + try { + TextFormat.write004(writer, getCollectorRegistry(type).filteredMetricFamilySamples(this.parse(httpRequest))); + writer.flush(); + } finally { + writer.close(); + } + return null; + } + + private CollectorRegistry getCollectorRegistry(String type) { + CollectorRegistry registry = (CollectorRegistry) servletContext.getAttribute( + CollectorRegistry.class.getCanonicalName() + "#" + type); + if (registry == null) { + throw new RuntimeException(String.format("Metric type[%s] not supported", type)); + } + return registry; + } + + private Set<String> parse(HttpServletRequest req) { + String[] includedParam = req.getParameterValues("name[]"); + return includedParam == null ? Collections.emptySet() : new HashSet<>(Arrays.asList(includedParam)); + } +} diff --git a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java index ea623ea5..b0f6f483 100644 --- a/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java +++ b/common/src/test/java/org/apache/uniffle/common/metrics/TestUtils.java @@ -49,6 +49,7 @@ public class TestUtils { HttpURLConnection con = (HttpURLConnection) url.openConnection(); con.setDoOutput(true); con.setRequestMethod("POST"); + con.setRequestProperty("Content-type", "application/json"); StringBuilder content = new StringBuilder(); try (OutputStream outputStream = con.getOutputStream();) { outputStream.write(postData.getBytes()); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java index 3fb185f4..9dcfe20f 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java @@ -35,16 +35,12 @@ import org.apache.uniffle.common.rpc.ServerInterface; import org.apache.uniffle.common.security.SecurityConfig; import org.apache.uniffle.common.security.SecurityContextFactory; import org.apache.uniffle.common.util.RssUtils; -import org.apache.uniffle.common.web.CommonMetricsServlet; import org.apache.uniffle.common.web.JettyServer; import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics; import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy; import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory; import org.apache.uniffle.coordinator.util.CoordinatorUtils; -import org.apache.uniffle.coordinator.web.servlet.CancelDecommissionServlet; -import org.apache.uniffle.coordinator.web.servlet.DecommissionServlet; -import org.apache.uniffle.coordinator.web.servlet.NodesServlet; import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE; import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE; @@ -156,10 +152,6 @@ public class CoordinatorServer extends ReconfigurableBase { int port = coordinatorConf.getInteger(CoordinatorConf.RPC_SERVER_PORT); id = ip + "-" + port; LOG.info("Start to initialize coordinator {}", id); - jettyServer = new JettyServer(coordinatorConf); - registerRESTAPI(); - // register metrics first to avoid NPE problem when add dynamic metrics - registerMetrics(); coordinatorConf.setString(CoordinatorUtils.COORDINATOR_ID, id); this.applicationManager = new ApplicationManager(coordinatorConf); @@ -174,7 +166,6 @@ public class CoordinatorServer extends ReconfigurableBase { } SecurityContextFactory.get().init(securityConfig); - // load default hadoop configuration Configuration hadoopConf = new Configuration(); ClusterManagerFactory clusterManagerFactory = new ClusterManagerFactory(coordinatorConf, hadoopConf); @@ -186,21 +177,20 @@ public class CoordinatorServer extends ReconfigurableBase { this.assignmentStrategy = assignmentStrategyFactory.getAssignmentStrategy(); this.accessManager = new AccessManager(coordinatorConf, clusterManager, applicationManager.getQuotaManager(), hadoopConf); + registerMetrics(); CoordinatorFactory coordinatorFactory = new CoordinatorFactory(this); server = coordinatorFactory.getServer(); - } - - private void registerRESTAPI() throws Exception { - LOG.info("Register REST API"); - jettyServer.addServlet( - new NodesServlet(this), - "/api/server/nodes"); - jettyServer.addServlet( - new DecommissionServlet(this), - "/api/server/decommission"); - jettyServer.addServlet( - new CancelDecommissionServlet(this), - "/api/server/cancelDecommission"); + jettyServer = new JettyServer(coordinatorConf); + // register packages and instances for jersey + jettyServer.addResourcePackages("org.apache.uniffle.coordinator.web.resource", + "org.apache.uniffle.common.web.resource"); + jettyServer.registerInstance(ClusterManager.class, clusterManager); + jettyServer.registerInstance(CollectorRegistry.class.getCanonicalName() + "#server", + CoordinatorMetrics.getCollectorRegistry()); + jettyServer.registerInstance(CollectorRegistry.class.getCanonicalName() + "#grpc", + grpcMetrics.getCollectorRegistry()); + jettyServer.registerInstance(CollectorRegistry.class.getCanonicalName() + "#jvm", + JvmMetrics.getCollectorRegistry()); } private void registerMetrics() throws Exception { @@ -213,26 +203,6 @@ public class CoordinatorServer extends ReconfigurableBase { CollectorRegistry jvmCollectorRegistry = new CollectorRegistry(true); JvmMetrics.register(jvmCollectorRegistry, verbose); - LOG.info("Add metrics servlet"); - jettyServer.addServlet( - new CommonMetricsServlet(CoordinatorMetrics.getCollectorRegistry()), - "/metrics/server"); - jettyServer.addServlet( - new CommonMetricsServlet(grpcMetrics.getCollectorRegistry()), - "/metrics/grpc"); - jettyServer.addServlet( - new CommonMetricsServlet(JvmMetrics.getCollectorRegistry()), - "/metrics/jvm"); - jettyServer.addServlet( - new CommonMetricsServlet(CoordinatorMetrics.getCollectorRegistry(), true), - "/prometheus/metrics/server"); - jettyServer.addServlet( - new CommonMetricsServlet(grpcMetrics.getCollectorRegistry(), true), - "/prometheus/metrics/grpc"); - jettyServer.addServlet( - new CommonMetricsServlet(JvmMetrics.getCollectorRegistry(), true), - "/prometheus/metrics/jvm"); - metricReporter = MetricReporterFactory.getMetricReporter(coordinatorConf, id); if (metricReporter != null) { metricReporter.addCollectorRegistry(CoordinatorMetrics.getCollectorRegistry()); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java new file mode 100644 index 00000000..cce108b2 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.resource; + +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("api") +@Produces({ MediaType.APPLICATION_JSON }) +public class APIResource { + @Path("server") + public Class<ServerResource> getServerResource() { + return ServerResource.class; + } + +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java new file mode 100644 index 00000000..29976357 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/ServerResource.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.resource; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import javax.servlet.ServletContext; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; + +import org.apache.commons.collections.CollectionUtils; + +import org.apache.uniffle.coordinator.ClusterManager; +import org.apache.uniffle.coordinator.ServerNode; +import org.apache.uniffle.coordinator.web.Response; +import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest; +import org.apache.uniffle.coordinator.web.request.DecommissionRequest; + +@Path("/server") +@Produces({ MediaType.APPLICATION_JSON }) +public class ServerResource { + @Context + private HttpServletRequest httpRequest; + @Context + protected ServletContext servletContext; + + @GET + @Path("/nodes") + public Response<List<ServerNode>> nodes(@QueryParam("id") String id, @QueryParam("status") String status) { + ClusterManager clusterManager = getClusterManager(); + List<ServerNode> serverList = clusterManager.getServerList(Collections.emptySet()); + serverList = serverList.stream().filter((server) -> { + if (id != null && !id.equals(server.getId())) { + return false; + } + if (status != null && !server.getStatus().toString().equals(status)) { + return false; + } + return true; + }).collect(Collectors.toList()); + serverList.sort(Comparator.comparing(ServerNode::getId)); + return Response.success(serverList); + } + + @POST + @Path("/cancelDecommission") + @Produces({ MediaType.APPLICATION_JSON }) + public Response<Object> cancelDecommission(CancelDecommissionRequest params) { + if (CollectionUtils.isEmpty(params.getServerIds())) { + return Response.fail("Parameter[serverIds] should not be null!"); + } + ClusterManager clusterManager = getClusterManager(); + try { + params.getServerIds().forEach(clusterManager::cancelDecommission); + } catch (Exception e) { + return Response.fail(e.getMessage()); + } + return Response.success(null); + } + + @POST + @Path("/decommission") + @Produces({ MediaType.APPLICATION_JSON }) + public Response<Object> decommission(DecommissionRequest params) { + if (CollectionUtils.isEmpty(params.getServerIds())) { + return Response.fail("Parameter[serverIds] should not be null!"); + } + ClusterManager clusterManager = getClusterManager(); + try { + params.getServerIds().forEach(clusterManager::decommission); + } catch (Exception e) { + return Response.fail(e.getMessage()); + } + return Response.success(null); + } + + private ClusterManager getClusterManager() { + return (ClusterManager) servletContext.getAttribute( + ClusterManager.class.getCanonicalName()); + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java deleted file mode 100644 index 99948dae..00000000 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/BaseServlet.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.coordinator.web.servlet; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.concurrent.Callable; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.ObjectMapper; - -import org.apache.uniffle.coordinator.web.Response; - -public abstract class BaseServlet<T> extends HttpServlet { - public static final String JSON_MIME_TYPE = "application/json"; - final ObjectMapper mapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); - - @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException { - writeJSON(resp, handlerRequest(() -> handleGet(req, resp))); - } - - @Override - protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { - writeJSON(resp, handlerRequest(() -> handlePost(req, resp))); - } - - private Response<T> handlerRequest( - Callable<Response<T>> function) { - Response<T> response; - try { - // todo: Do something for authentication - response = function.call(); - } catch (Exception e) { - response = Response.fail(e.getMessage()); - } - return response; - } - - protected Response<T> handleGet( - HttpServletRequest req, - HttpServletResponse resp) throws ServletException, IOException { - throw new IOException("Method not support!"); - } - - protected Response<T> handlePost( - HttpServletRequest req, - HttpServletResponse resp) throws ServletException, IOException { - throw new IOException("Method not support!"); - } - - protected void writeJSON(final HttpServletResponse resp, final Object obj) - throws IOException { - if (obj == null) { - return; - } - resp.setContentType(JSON_MIME_TYPE); - final OutputStream stream = resp.getOutputStream(); - mapper.writeValue(stream, obj); - } - - protected <T> T parseParamsFromJson(HttpServletRequest req, Class<T> clazz) throws IOException { - return mapper.readValue(req.getInputStream(), clazz); - } -} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java deleted file mode 100644 index 24c77f8c..00000000 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/CancelDecommissionServlet.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.coordinator.web.servlet; - -import java.io.IOException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.commons.collections.CollectionUtils; - -import org.apache.uniffle.coordinator.ClusterManager; -import org.apache.uniffle.coordinator.CoordinatorServer; -import org.apache.uniffle.coordinator.web.Response; -import org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest; - -public class CancelDecommissionServlet extends BaseServlet<Object> { - private final CoordinatorServer coordinator; - - public CancelDecommissionServlet(CoordinatorServer coordinator) { - this.coordinator = coordinator; - } - - @Override - protected Response<Object> handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException { - CancelDecommissionRequest params = parseParamsFromJson(req, CancelDecommissionRequest.class); - if (CollectionUtils.isEmpty(params.getServerIds())) { - return Response.fail("Parameter[serverIds] should not be null!"); - } - ClusterManager clusterManager = coordinator.getClusterManager(); - params.getServerIds().forEach((serverId) -> { - clusterManager.cancelDecommission(serverId); - }); - return Response.success(null); - } -} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java deleted file mode 100644 index 3f3ab1ef..00000000 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/DecommissionServlet.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.coordinator.web.servlet; - -import java.io.IOException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.commons.collections.CollectionUtils; - -import org.apache.uniffle.coordinator.ClusterManager; -import org.apache.uniffle.coordinator.CoordinatorServer; -import org.apache.uniffle.coordinator.web.Response; -import org.apache.uniffle.coordinator.web.request.DecommissionRequest; - -public class DecommissionServlet extends BaseServlet<Object> { - private final CoordinatorServer coordinator; - - public DecommissionServlet(CoordinatorServer coordinator) { - this.coordinator = coordinator; - } - - @Override - protected Response<Object> handlePost(HttpServletRequest req, HttpServletResponse resp) throws IOException { - DecommissionRequest params = parseParamsFromJson(req, DecommissionRequest.class); - if (CollectionUtils.isEmpty(params.getServerIds())) { - return Response.fail("Parameter[serverIds] should not be null!"); - } - ClusterManager clusterManager = coordinator.getClusterManager(); - params.getServerIds().forEach((serverId) -> { - clusterManager.decommission(serverId); - }); - return Response.success(null); - } -} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java deleted file mode 100644 index 788f7f0a..00000000 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/servlet/NodesServlet.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.coordinator.web.servlet; - -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.uniffle.coordinator.CoordinatorServer; -import org.apache.uniffle.coordinator.ServerNode; -import org.apache.uniffle.coordinator.web.Response; - - -public class NodesServlet extends BaseServlet<List<ServerNode>> { - private final CoordinatorServer coordinator; - - public NodesServlet(CoordinatorServer coordinator) { - this.coordinator = coordinator; - } - - @Override - protected Response<List<ServerNode>> handleGet(HttpServletRequest req, HttpServletResponse resp) { - List<ServerNode> serverList = coordinator.getClusterManager().getServerList(Collections.EMPTY_SET); - String id = req.getParameter("id"); - String status = req.getParameter("status"); - serverList = serverList.stream().filter((server) -> { - if (id != null && !id.equals(server.getId())) { - return false; - } - if (status != null && !server.getStatus().toString().equals(status)) { - return false; - } - return true; - }).collect(Collectors.toList()); - Collections.sort(serverList, Comparator.comparing(ServerNode::getId)); - return Response.success(serverList); - } -} diff --git a/pom.xml b/pom.xml index 670e6862..cd5b0690 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ <codehaus.jackson.version>1.9.13</codehaus.jackson.version> <error_prone_annotations.version>2.10.0</error_prone_annotations.version> <execution.root>${user.dir}</execution.root> - <fasterxml.jackson.version>2.10.0</fasterxml.jackson.version> + <fasterxml.jackson.version>2.14.1</fasterxml.jackson.version> <grpc.version>1.47.0</grpc.version> <gson.version>2.9.0</gson.version> <guava.version>31.0.1-jre</guava.version> @@ -53,7 +53,8 @@ <httpcore.version>4.4.4</httpcore.version> <java.version>1.8</java.version> <javax.annotation.version>1.3.2</javax.annotation.version> - <jetty.version>9.3.24.v20180605</jetty.version> + <jetty.version>9.4.49.v20220914</jetty.version> + <jersey.version>2.39.1</jersey.version> <junit.jupiter.version>5.8.2</junit.jupiter.version> <junit.platform.version>1.8.2</junit.platform.version> <system.stubs.version>2.0.1</system.stubs.version> @@ -352,6 +353,10 @@ <groupId>net.minidev</groupId> <artifactId>json-smart</artifactId> </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>*</artifactId> + </exclusion> <exclusion> <groupId>com.sun.jersey</groupId> <artifactId>jersey-json</artifactId> @@ -670,6 +675,41 @@ <version>${jetty.version}</version> </dependency> + <dependency> + <groupId>org.glassfish.jersey.inject</groupId> + <artifactId>jersey-hk2</artifactId> + <version>${jersey.version}</version> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-server</artifactId> + <version>${jersey.version}</version> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>jersey-container-jetty-servlet</artifactId> + <version>${jersey.version}</version> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.media</groupId> + <artifactId>jersey-media-json-jackson</artifactId> + <version>${jersey.version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>${jetty.version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>${jetty.version}</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + <version>${jetty.version}</version> + </dependency> </dependencies> </dependencyManagement>
