This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new d27649c Moved metrics for coordinator to test coordinator class d27649c is described below commit d27649cb67f4fcf0c472ad5138ae0ac6ecef9e42 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Apr 27 21:14:09 2021 +0000 Moved metrics for coordinator to test coordinator class --- server/compaction-coordinator/pom.xml | 17 -- .../coordinator/CompactionCoordinator.java | 78 ---------- .../coordinator/CompactionCoordinatorTest.java | 6 - test/pom.xml | 13 ++ .../apache/accumulo/test/ExternalCompactionIT.java | 8 +- .../accumulo/test/TestCompactionCoordinator.java | 171 +++++++++++++++++++++ 6 files changed, 188 insertions(+), 105 deletions(-) diff --git a/server/compaction-coordinator/pom.xml b/server/compaction-coordinator/pom.xml index 0793491..93c12b4 100644 --- a/server/compaction-coordinator/pom.xml +++ b/server/compaction-coordinator/pom.xml @@ -36,10 +36,6 @@ <optional>true</optional> </dependency> <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> @@ -68,19 +64,6 @@ <artifactId>zookeeper</artifactId> </dependency> <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-server</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-util</artifactId> - </dependency> - <dependency> - <groupId>org.eclipse.jetty.toolchain</groupId> - <artifactId>jetty-jakarta-servlet-api</artifactId> - <version>5.0.2</version> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 1107e4d..b8b1fa3 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -20,7 +20,6 @@ package org.apache.accumulo.coordinator; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; -import java.io.IOException; import java.net.UnknownHostException; import java.time.Duration; import java.util.List; @@ -30,10 +29,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import jakarta.servlet.ServletException; -import jakarta.servlet.http.HttpServletRequest; -import jakarta.servlet.http.HttpServletResponse; - import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -83,18 +78,9 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.handler.AbstractHandler; -import org.eclipse.jetty.server.handler.ContextHandler; -import org.eclipse.jetty.server.handler.ContextHandlerCollection; -import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; - public class CompactionCoordinator extends AbstractServer implements org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface, LiveTServerSet.Listener { @@ -107,8 +93,6 @@ public class CompactionCoordinator extends AbstractServer protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries(); - private static final Gson GSON = new Gson(); - /* Map of compactionId to RunningCompactions */ protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING = new ConcurrentHashMap<>(); @@ -116,7 +100,6 @@ public class CompactionCoordinator extends AbstractServer /* Map of queue name to last time compactor called to get a compaction job */ private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); - private final ExternalCompactionMetrics metrics = new ExternalCompactionMetrics(); private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); protected SecurityOperation security; protected final AccumuloConfiguration aconf; @@ -238,50 +221,6 @@ public class CompactionCoordinator extends AbstractServer return sp; } - protected Server startHttpMetricServer() throws Exception { - int port = getContext().getConfiguration().getPortStream(Property.COORDINATOR_METRICPORT) - .iterator().next(); - String hostname = getHostname(); - Server metricServer = new Server(new QueuedThreadPool(4, 1)); - ServerConnector c = new ServerConnector(metricServer); - c.setHost(hostname); - c.setPort(port); - metricServer.addConnector(c); - ContextHandlerCollection handlers = new ContextHandlerCollection(); - metricServer.setHandler(handlers); - ContextHandler metricContext = new ContextHandler("/metrics"); - metricContext.setHandler(new AbstractHandler() { - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, - HttpServletResponse response) throws IOException, ServletException { - baseRequest.setHandled(true); - response.setStatus(200); - response.setContentType("application/json"); - metrics.setRunning(RUNNING.size()); - LOG.debug("Returning metrics: {}", metrics); - response.getWriter().print(GSON.toJson(metrics)); - } - }); - handlers.addHandler(metricContext); - - ContextHandler detailsContext = new ContextHandler("/details"); - detailsContext.setHandler(new AbstractHandler() { - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, - HttpServletResponse response) throws IOException, ServletException { - baseRequest.setHandled(true); - response.setStatus(200); - response.setContentType("application/json"); - response.getWriter().print(GSON.toJson(RUNNING)); - } - }); - handlers.addHandler(detailsContext); - - metricServer.start(); - LOG.info("Metrics HTTP server listening on {}:{}", hostname, port); - return metricServer; - } - @Override public void run() { @@ -293,13 +232,6 @@ public class CompactionCoordinator extends AbstractServer } final HostAndPort clientAddress = coordinatorAddress.address; - Server metricServer = null; - try { - metricServer = startHttpMetricServer(); - } catch (Exception e1) { - throw new RuntimeException("Failed to start metric http server", e1); - } - try { getCoordinatorLock(clientAddress); } catch (KeeperException | InterruptedException e) { @@ -470,13 +402,6 @@ public class CompactionCoordinator extends AbstractServer } LOG.info("Shutting down"); - if (null != metricServer) { - try { - metricServer.stop(); - } catch (Exception e) { - LOG.error("Error stopping metric server", e); - } - } } protected long getMissingCompactorWarningTime() { @@ -560,7 +485,6 @@ public class CompactionCoordinator extends AbstractServer } RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), new RunningCompaction(job, compactorAddress, tserver)); - metrics.incrementStarted(); LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress); result = job; break; @@ -701,7 +625,6 @@ public class CompactionCoordinator extends AbstractServer SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } LOG.info("Compaction completed, id: {}, stats: {}", externalCompactionId, stats); - metrics.incrementCompleted(); final var ecid = ExternalCompactionId.of(externalCompactionId); final RunningCompaction rc = RUNNING.get(ecid); if (null != rc) { @@ -727,7 +650,6 @@ public class CompactionCoordinator extends AbstractServer SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } LOG.info("Compaction failed, id: {}", externalCompactionId); - metrics.incrementFailed(); final var ecid = ExternalCompactionId.of(externalCompactionId); final RunningCompaction rc = RUNNING.get(ecid); if (null != rc) { diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 8f1bee3..2aa2b4f 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -60,7 +60,6 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.KeeperException; import org.easymock.EasyMock; -import org.eclipse.jetty.server.Server; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; @@ -186,11 +185,6 @@ public class CompactionCoordinatorTest { }; } - @Override - protected Server startHttpMetricServer() throws Exception { - return null; - } - } @Test diff --git a/test/pom.xml b/test/pom.xml index 921425e..0a1c788 100644 --- a/test/pom.xml +++ b/test/pom.xml @@ -187,6 +187,19 @@ <artifactId>easymock</artifactId> </dependency> <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty.toolchain</groupId> + <artifactId>jetty-jakarta-servlet-api</artifactId> + <version>5.0.2</version> + </dependency> + <dependency> <groupId>org.jline</groupId> <artifactId>jline</artifactId> </dependency> diff --git a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java index efaf042..27898d9 100644 --- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java @@ -188,7 +188,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase { writeData(client, table1); cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - cluster.exec(CompactionCoordinator.class); + cluster.exec(TestCompactionCoordinator.class); compact(client, table1, 2, "DCQ1", false); // Wait for the compaction to start by waiting for 1 external compaction column @@ -260,7 +260,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase { TableId tid = Tables.getTableId(getCluster().getServerContext(), table1); cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - cluster.exec(CompactionCoordinator.class); + cluster.exec(TestCompactionCoordinator.class); // Wait for the compaction to start by waiting for 1 external compaction column List<TabletMetadata> md = new ArrayList<>(); @@ -353,7 +353,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase { // Wait for the coordinator to insert the running compaction metadata // entry into the metadata table, then cancel the compaction cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - cluster.exec(CompactionCoordinator.class); + cluster.exec(TestCompactionCoordinator.class); compact(client, table1, 2, "DCQ1", false); @@ -404,7 +404,7 @@ public class ExternalCompactionIT extends ConfigurableMacBase { // Wait for the coordinator to insert the running compaction metadata // entry into the metadata table, then delete the table. cluster.exec(ExternalDoNothingCompactor.class, "-q", "DCQ1"); - cluster.exec(CompactionCoordinator.class); + cluster.exec(TestCompactionCoordinator.class); List<TabletMetadata> md = new ArrayList<>(); TabletsMetadata tm = getCluster().getServerContext().getAmple().readTablets() diff --git a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java new file mode 100644 index 0000000..0d3e708 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinator.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import java.io.IOException; +import java.net.UnknownHostException; + +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +import org.apache.accumulo.coordinator.CompactionCoordinator; +import org.apache.accumulo.coordinator.ExternalCompactionMetrics; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.thrift.TException; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +public class TestCompactionCoordinator extends CompactionCoordinator + implements org.apache.accumulo.core.compaction.thrift.CompactionCoordinator.Iface { + + private static final Logger LOG = LoggerFactory.getLogger(TestCompactionCoordinator.class); + private static final Gson GSON = new Gson(); + + private final ExternalCompactionMetrics metrics = new ExternalCompactionMetrics(); + private Server metricServer = null; + + protected TestCompactionCoordinator(ServerOpts opts, String[] args) { + super(opts, args); + } + + private Server startHttpMetricServer() throws Exception { + int port = getContext().getConfiguration().getPortStream(Property.COORDINATOR_METRICPORT) + .iterator().next(); + String hostname = getHostname(); + Server metricServer = new Server(new QueuedThreadPool(4, 1)); + ServerConnector c = new ServerConnector(metricServer); + c.setHost(hostname); + c.setPort(port); + metricServer.addConnector(c); + ContextHandlerCollection handlers = new ContextHandlerCollection(); + metricServer.setHandler(handlers); + ContextHandler metricContext = new ContextHandler("/metrics"); + metricContext.setHandler(new AbstractHandler() { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + response.setStatus(200); + response.setContentType("application/json"); + metrics.setRunning(RUNNING.size()); + LOG.debug("Returning metrics: {}", metrics); + response.getWriter().print(GSON.toJson(metrics)); + } + }); + handlers.addHandler(metricContext); + + ContextHandler detailsContext = new ContextHandler("/details"); + detailsContext.setHandler(new AbstractHandler() { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); + response.setStatus(200); + response.setContentType("application/json"); + response.getWriter().print(GSON.toJson(RUNNING)); + } + }); + handlers.addHandler(detailsContext); + + metricServer.start(); + LOG.info("Metrics HTTP server listening on {}:{}", hostname, port); + return metricServer; + } + + @Override + protected ServerAddress startCoordinatorClientService() throws UnknownHostException { + try { + return super.startCoordinatorClientService(); + } finally { + try { + metricServer = startHttpMetricServer(); + } catch (Exception e1) { + throw new RuntimeException("Failed to start metric http server", e1); + } + } + } + + @Override + public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, + String queueName, String compactorAddress, String externalCompactionId) throws TException { + TExternalCompactionJob job = super.getCompactionJob(tinfo, credentials, queueName, + compactorAddress, externalCompactionId); + if (null != job && null != job.getExternalCompactionId()) { + metrics.incrementStarted(); + } + return job; + } + + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent textent, TCompactionStats stats) throws TException { + try { + super.compactionCompleted(tinfo, credentials, externalCompactionId, textent, stats); + } finally { + metrics.incrementCompleted(); + } + } + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, + TKeyExtent extent) throws TException { + try { + super.compactionFailed(tinfo, credentials, externalCompactionId, extent); + } finally { + metrics.incrementFailed(); + } + } + + @Override + public void close() { + super.close(); + if (null != metricServer) { + try { + metricServer.stop(); + } catch (Exception e) { + LOG.error("Error stopping metric server", e); + } + } + } + + public static void main(String[] args) throws Exception { + try (TestCompactionCoordinator compactor = + new TestCompactionCoordinator(new ServerOpts(), args)) { + compactor.runServer(); + } + } + +}