http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java deleted file mode 100644 index 267f75a..0000000 --- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bookkeeper.stats; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.health.HealthCheckRegistry; -import com.codahale.metrics.servlets.AdminServlet; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; - -/** - * Starts a jetty server on a configurable port to export stats. - */ -public class ServletReporter { - - private final MetricRegistry metricRegistry; - private final HealthCheckRegistry healthCheckRegistry; - private final int port; - private final Server jettyServer; - - public ServletReporter(MetricRegistry metricRegistry, - HealthCheckRegistry healthCheckRegistry, - int port) { - this.metricRegistry = metricRegistry; - this.healthCheckRegistry = healthCheckRegistry; - this.port = port; - this.jettyServer = new Server(port); - } - - public void start() throws Exception { - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath("/"); - jettyServer.setHandler(context); - - context.addEventListener(new HealthCheckServletContextListener(healthCheckRegistry)); - context.addEventListener(new MetricsServletContextListener(metricRegistry)); - context.addServlet(new ServletHolder(new AdminServlet()), "/*"); - - jettyServer.start(); - } - - public void stop() throws Exception { - jettyServer.stop(); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java deleted file mode 100644 index 5bdb3ce..0000000 --- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Extension of {@link org.apache.bookkeeper.stats.CodahaleMetricsProvider}. - */ -package org.apache.bookkeeper.stats; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java deleted file mode 100644 index 96bc338..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ClientUtils.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import org.apache.distributedlog.client.DistributedLogClientImpl; -import org.apache.distributedlog.client.monitor.MonitorServiceClient; -import org.apache.commons.lang3.tuple.Pair; - -/** - * DistributedLog Client Related Utils. - */ -public class ClientUtils { - - public static Pair<DistributedLogClient, MonitorServiceClient> buildClient(DistributedLogClientBuilder builder) { - DistributedLogClientImpl clientImpl = builder.buildClient(); - return Pair.of((DistributedLogClient) clientImpl, (MonitorServiceClient) clientImpl); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java deleted file mode 100644 index 9cc085d..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java +++ /dev/null @@ -1,352 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.LocalDLMEmulator; -import org.apache.distributedlog.client.routing.SingleHostRoutingService; -import org.apache.distributedlog.impl.metadata.BKDLConfig; -import org.apache.distributedlog.metadata.DLMetadata; -import org.apache.distributedlog.service.placement.EqualLoadAppraiser; -import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; -import com.twitter.finagle.builder.Server; -import java.io.File; -import java.net.BindException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim; -import org.apache.bookkeeper.stats.NullStatsProvider; -import org.apache.bookkeeper.util.IOUtils; -import org.apache.bookkeeper.util.LocalBookKeeper; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * DistributedLog Cluster is an emulator to run distributedlog components. - */ -public class DistributedLogCluster { - - private static final Logger LOG = LoggerFactory.getLogger(DistributedLogCluster.class); - - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Builder to build distributedlog cluster. - */ - public static class Builder { - - int numBookies = 3; - boolean shouldStartZK = true; - String zkHost = "127.0.0.1"; - int zkPort = 0; - boolean shouldStartProxy = true; - int proxyPort = 7000; - boolean thriftmux = false; - DistributedLogConfiguration dlConf = new DistributedLogConfiguration() - .setLockTimeout(10) - .setOutputBufferSize(0) - .setImmediateFlushEnabled(true); - ServerConfiguration bkConf = new ServerConfiguration(); - - private Builder() {} - - /** - * How many bookies to run. By default is 3. - * - * @return builder - */ - public Builder numBookies(int numBookies) { - this.numBookies = numBookies; - return this; - } - - /** - * Whether to start zookeeper? By default is true. - * - * @param startZK - * flag to start zookeeper? - * @return builder - */ - public Builder shouldStartZK(boolean startZK) { - this.shouldStartZK = startZK; - return this; - } - - /** - * ZooKeeper server to run. By default it runs locally on '127.0.0.1'. - * - * @param zkServers - * zk servers - * @return builder - */ - public Builder zkServers(String zkServers) { - this.zkHost = zkServers; - return this; - } - - /** - * ZooKeeper server port to listen on. By default it listens on 2181. - * - * @param zkPort - * zookeeper server port. - * @return builder. - */ - public Builder zkPort(int zkPort) { - this.zkPort = zkPort; - return this; - } - - /** - * Whether to start proxy or not. By default is true. - * - * @param startProxy - * whether to start proxy or not. - * @return builder - */ - public Builder shouldStartProxy(boolean startProxy) { - this.shouldStartProxy = startProxy; - return this; - } - - /** - * Port that proxy server to listen on. By default is 7000. - * - * @param proxyPort - * port that proxy server to listen on. - * @return builder - */ - public Builder proxyPort(int proxyPort) { - this.proxyPort = proxyPort; - return this; - } - - /** - * Set the distributedlog configuration. - * - * @param dlConf - * distributedlog configuration - * @return builder - */ - public Builder dlConf(DistributedLogConfiguration dlConf) { - this.dlConf = dlConf; - return this; - } - - /** - * Set the Bookkeeper server configuration. - * - * @param bkConf - * bookkeeper server configuration - * @return builder - */ - public Builder bkConf(ServerConfiguration bkConf) { - this.bkConf = bkConf; - return this; - } - - /** - * Enable thriftmux for the dl server. - * - * @param enabled flag to enable thriftmux - * @return builder - */ - public Builder thriftmux(boolean enabled) { - this.thriftmux = enabled; - return this; - } - - public DistributedLogCluster build() throws Exception { - // build the cluster - return new DistributedLogCluster( - dlConf, - bkConf, - numBookies, - shouldStartZK, - zkHost, - zkPort, - shouldStartProxy, - proxyPort, - thriftmux); - } - } - - /** - * Run a distributedlog proxy server. - */ - public static class DLServer { - - static final int MAX_RETRIES = 20; - static final int MIN_PORT = 1025; - static final int MAX_PORT = 65535; - - int proxyPort; - - public final InetSocketAddress address; - public final Pair<DistributedLogServiceImpl, Server> dlServer; - private final SingleHostRoutingService routingService = SingleHostRoutingService.of(null); - - protected DLServer(DistributedLogConfiguration dlConf, - URI uri, - int basePort, - boolean thriftmux) throws Exception { - proxyPort = basePort; - - boolean success = false; - int retries = 0; - Pair<DistributedLogServiceImpl, Server> serverPair = null; - while (!success) { - try { - org.apache.distributedlog.service.config.ServerConfiguration serverConf = - new org.apache.distributedlog.service.config.ServerConfiguration(); - serverConf.loadConf(dlConf); - serverConf.setServerShardId(proxyPort); - serverPair = DistributedLogServer.runServer( - serverConf, - dlConf, - uri, - new IdentityStreamPartitionConverter(), - routingService, - new NullStatsProvider(), - proxyPort, - thriftmux, - new EqualLoadAppraiser()); - routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort)); - routingService.startService(); - serverPair.getLeft().startPlacementPolicy(); - success = true; - } catch (BindException be) { - retries++; - if (retries > MAX_RETRIES) { - throw be; - } - proxyPort++; - if (proxyPort > MAX_PORT) { - proxyPort = MIN_PORT; - } - } - } - - LOG.info("Running DL on port {}", proxyPort); - - dlServer = serverPair; - address = DLSocketAddress.getSocketAddress(proxyPort); - } - - public InetSocketAddress getAddress() { - return address; - } - - public void shutdown() { - DistributedLogServer.closeServer(dlServer, 0, TimeUnit.MILLISECONDS); - routingService.stopService(); - } - } - - private final DistributedLogConfiguration dlConf; - private final ZooKeeperServerShim zks; - private final LocalDLMEmulator dlmEmulator; - private DLServer dlServer; - private final boolean shouldStartProxy; - private final int proxyPort; - private final boolean thriftmux; - private final List<File> tmpDirs = new ArrayList<File>(); - - private DistributedLogCluster(DistributedLogConfiguration dlConf, - ServerConfiguration bkConf, - int numBookies, - boolean shouldStartZK, - String zkServers, - int zkPort, - boolean shouldStartProxy, - int proxyPort, - boolean thriftmux) throws Exception { - this.dlConf = dlConf; - if (shouldStartZK) { - File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog"); - tmpDirs.add(zkTmpDir); - if (0 == zkPort) { - Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir); - this.zks = serverAndPort.getLeft(); - zkPort = serverAndPort.getRight(); - } else { - this.zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir); - } - } else { - this.zks = null; - } - this.dlmEmulator = LocalDLMEmulator.newBuilder() - .numBookies(numBookies) - .zkHost(zkServers) - .zkPort(zkPort) - .serverConf(bkConf) - .shouldStartZK(false) - .build(); - this.shouldStartProxy = shouldStartProxy; - this.proxyPort = proxyPort; - this.thriftmux = thriftmux; - } - - public void start() throws Exception { - this.dlmEmulator.start(); - BKDLConfig bkdlConfig = new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl"); - DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri()); - if (shouldStartProxy) { - this.dlServer = new DLServer( - dlConf, - this.dlmEmulator.getUri(), - proxyPort, - thriftmux); - } else { - this.dlServer = null; - } - } - - public void stop() throws Exception { - if (null != dlServer) { - this.dlServer.shutdown(); - } - this.dlmEmulator.teardown(); - if (null != this.zks) { - this.zks.stop(); - } - for (File dir : tmpDirs) { - FileUtils.forceDeleteOnExit(dir); - } - } - - public URI getUri() { - return this.dlmEmulator.getUri(); - } - - public String getZkServers() { - return this.dlmEmulator.getZkServers(); - } - - public String getProxyFinagleStr() { - return "inet!" + (dlServer == null ? "127.0.0.1:" + proxyPort : dlServer.getAddress().toString()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java deleted file mode 100644 index 81e476b..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java +++ /dev/null @@ -1,460 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.client.routing.RoutingService; -import org.apache.distributedlog.config.DynamicConfigurationFactory; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.service.announcer.Announcer; -import org.apache.distributedlog.service.announcer.NOPAnnouncer; -import org.apache.distributedlog.service.announcer.ServerSetAnnouncer; -import org.apache.distributedlog.service.config.DefaultStreamConfigProvider; -import org.apache.distributedlog.service.config.NullStreamConfigProvider; -import org.apache.distributedlog.service.config.ServerConfiguration; -import org.apache.distributedlog.service.config.ServiceStreamConfigProvider; -import org.apache.distributedlog.service.config.StreamConfigProvider; -import org.apache.distributedlog.service.placement.EqualLoadAppraiser; -import org.apache.distributedlog.service.placement.LoadAppraiser; -import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import org.apache.distributedlog.thrift.service.DistributedLogService; -import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.SchedulerUtils; -import com.twitter.finagle.Stack; -import com.twitter.finagle.ThriftMuxServer$; -import com.twitter.finagle.builder.Server; -import com.twitter.finagle.builder.ServerBuilder; -import com.twitter.finagle.stats.NullStatsReceiver; -import com.twitter.finagle.stats.StatsReceiver; -import com.twitter.finagle.thrift.ClientIdRequiredFilter; -import com.twitter.finagle.thrift.ThriftServerFramedCodec; -import com.twitter.finagle.transport.Transport; -import com.twitter.util.Duration; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.MalformedURLException; -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; -import scala.Tuple2; - -/** - * Running the distributedlog proxy server. - */ -public class DistributedLogServer { - - private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class); - private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName(); - - private DistributedLogServiceImpl dlService = null; - private Server server = null; - private RoutingService routingService; - private StatsProvider statsProvider; - private Announcer announcer = null; - private ScheduledExecutorService configExecutorService; - private long gracefulShutdownMs = 0L; - - private final StatsReceiver statsReceiver; - private final CountDownLatch keepAliveLatch = new CountDownLatch(1); - private final Optional<String> uri; - private final Optional<String> conf; - private final Optional<String> streamConf; - private final Optional<Integer> port; - private final Optional<Integer> statsPort; - private final Optional<Integer> shardId; - private final Optional<Boolean> announceServerSet; - private final Optional<String> loadAppraiserClassStr; - private final Optional<Boolean> thriftmux; - - DistributedLogServer(Optional<String> uri, - Optional<String> conf, - Optional<String> streamConf, - Optional<Integer> port, - Optional<Integer> statsPort, - Optional<Integer> shardId, - Optional<Boolean> announceServerSet, - Optional<String> loadAppraiserClass, - Optional<Boolean> thriftmux, - RoutingService routingService, - StatsReceiver statsReceiver, - StatsProvider statsProvider) { - this.uri = uri; - this.conf = conf; - this.streamConf = streamConf; - this.port = port; - this.statsPort = statsPort; - this.shardId = shardId; - this.announceServerSet = announceServerSet; - this.thriftmux = thriftmux; - this.routingService = routingService; - this.statsReceiver = statsReceiver; - this.statsProvider = statsProvider; - this.loadAppraiserClassStr = loadAppraiserClass; - } - - public void runServer() - throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException { - if (!uri.isPresent()) { - throw new IllegalArgumentException("No distributedlog uri provided."); - } - URI dlUri = URI.create(uri.get()); - DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); - if (conf.isPresent()) { - String configFile = conf.get(); - try { - dlConf.loadConf(new File(configFile).toURI().toURL()); - } catch (ConfigurationException e) { - throw new IllegalArgumentException("Failed to load distributedlog configuration from " - + configFile + "."); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed " - + configFile + "."); - } - } - - this.configExecutorService = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder() - .setNameFormat("DistributedLogService-Dyncfg-%d") - .setDaemon(true) - .build()); - - // server configuration and dynamic configuration - ServerConfiguration serverConf = new ServerConfiguration(); - serverConf.loadConf(dlConf); - - // overwrite the shard id if it is provided in the args - if (shardId.isPresent()) { - serverConf.setServerShardId(shardId.get()); - } - - serverConf.validate(); - - DynamicDistributedLogConfiguration dynDlConf = getServiceDynConf(dlConf); - - logger.info("Starting stats provider : {}", statsProvider.getClass()); - statsProvider.start(dlConf); - - if (announceServerSet.isPresent() && announceServerSet.get()) { - announcer = new ServerSetAnnouncer( - dlUri, - port.or(0), - statsPort.or(0), - shardId.or(0)); - } else { - announcer = new NOPAnnouncer(); - } - - // Build the stream partition converter - StreamPartitionConverter converter; - try { - converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass()); - } catch (ConfigurationException e) { - logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}", - IdentityStreamPartitionConverter.class.getName()); - converter = new IdentityStreamPartitionConverter(); - } - Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER)); - LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass); - logger.info("Load appraiser class is " + loadAppraiserClassStr.or("not specified.") + " Instantiated " - + loadAppraiser.getClass().getCanonicalName()); - - StreamConfigProvider streamConfProvider = - getStreamConfigProvider(dlConf, converter); - - // pre-run - preRun(dlConf, serverConf); - - Pair<DistributedLogServiceImpl, Server> serverPair = runServer( - serverConf, - dlConf, - dynDlConf, - dlUri, - converter, - routingService, - statsProvider, - port.or(0), - keepAliveLatch, - statsReceiver, - thriftmux.isPresent(), - streamConfProvider, - loadAppraiser); - - this.dlService = serverPair.getLeft(); - this.server = serverPair.getRight(); - - // announce the service - announcer.announce(); - // start the routing service after announced - routingService.startService(); - logger.info("Started the routing service."); - dlService.startPlacementPolicy(); - logger.info("Started the placement policy."); - } - - protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) { - this.gracefulShutdownMs = serverConf.getGracefulShutdownPeriodMs(); - if (!serverConf.isDurableWriteEnabled()) { - conf.setDurableWriteEnabled(false); - } - } - - private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf) - throws ConfigurationException { - Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent(); - if (conf.isPresent()) { - DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory( - configExecutorService, dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS); - dynConf = configFactory.getDynamicConfiguration(conf.get()); - } - if (dynConf.isPresent()) { - return dynConf.get(); - } else { - return ConfUtils.getConstDynConf(dlConf); - } - } - - private StreamConfigProvider getStreamConfigProvider(DistributedLogConfiguration dlConf, - StreamPartitionConverter partitionConverter) - throws ConfigurationException { - StreamConfigProvider streamConfProvider = new NullStreamConfigProvider(); - if (streamConf.isPresent() && conf.isPresent()) { - String dynConfigPath = streamConf.get(); - String defaultConfigFile = conf.get(); - streamConfProvider = new ServiceStreamConfigProvider( - dynConfigPath, - defaultConfigFile, - partitionConverter, - configExecutorService, - dlConf.getDynamicConfigReloadIntervalSec(), - TimeUnit.SECONDS); - } else if (conf.isPresent()) { - String configFile = conf.get(); - streamConfProvider = new DefaultStreamConfigProvider(configFile, configExecutorService, - dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS); - } - return streamConfProvider; - } - - static Pair<DistributedLogServiceImpl, Server> runServer( - ServerConfiguration serverConf, - DistributedLogConfiguration dlConf, - URI dlUri, - StreamPartitionConverter converter, - RoutingService routingService, - StatsProvider provider, - int port, - boolean thriftmux, - LoadAppraiser loadAppraiser) throws IOException { - - return runServer(serverConf, - dlConf, - ConfUtils.getConstDynConf(dlConf), - dlUri, - converter, - routingService, - provider, - port, - new CountDownLatch(0), - new NullStatsReceiver(), - thriftmux, - new NullStreamConfigProvider(), - loadAppraiser); - } - - static Pair<DistributedLogServiceImpl, Server> runServer( - ServerConfiguration serverConf, - DistributedLogConfiguration dlConf, - DynamicDistributedLogConfiguration dynDlConf, - URI dlUri, - StreamPartitionConverter partitionConverter, - RoutingService routingService, - StatsProvider provider, - int port, - CountDownLatch keepAliveLatch, - StatsReceiver statsReceiver, - boolean thriftmux, - StreamConfigProvider streamConfProvider, - LoadAppraiser loadAppraiser) throws IOException { - logger.info("Running server @ uri {}.", dlUri); - - boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled(); - StatsLogger perStreamStatsLogger; - if (perStreamStatsEnabled) { - perStreamStatsLogger = provider.getStatsLogger("stream"); - } else { - perStreamStatsLogger = NullStatsLogger.INSTANCE; - } - - // dl service - DistributedLogServiceImpl dlService = new DistributedLogServiceImpl( - serverConf, - dlConf, - dynDlConf, - streamConfProvider, - dlUri, - partitionConverter, - routingService, - provider.getStatsLogger(""), - perStreamStatsLogger, - keepAliveLatch, - loadAppraiser); - - StatsReceiver serviceStatsReceiver = statsReceiver.scope("service"); - StatsLogger serviceStatsLogger = provider.getStatsLogger("service"); - - ServerBuilder serverBuilder = ServerBuilder.get() - .name("DistributedLogServer") - .codec(ThriftServerFramedCodec.get()) - .reportTo(statsReceiver) - .keepAlive(true) - .bindTo(new InetSocketAddress(port)); - - if (thriftmux) { - logger.info("Using thriftmux."); - Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness( - Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk(); - serverBuilder = serverBuilder.stack( - ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2())); - } - - logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString()); - - // starts dl server - Server server = ServerBuilder.safeBuild( - new ClientIdRequiredFilter<byte[], byte[]>(serviceStatsReceiver).andThen( - new StatsFilter<byte[], byte[]>(serviceStatsLogger).andThen( - new DistributedLogService.Service(dlService, new TBinaryProtocol.Factory()))), - serverBuilder); - - logger.info("Started DistributedLog Server."); - return Pair.of(dlService, server); - } - - static void closeServer(Pair<DistributedLogServiceImpl, Server> pair, - long gracefulShutdownPeriod, - TimeUnit timeUnit) { - if (null != pair.getLeft()) { - pair.getLeft().shutdown(); - if (gracefulShutdownPeriod > 0) { - try { - timeUnit.sleep(gracefulShutdownPeriod); - } catch (InterruptedException e) { - logger.info("Interrupted on waiting service shutting down state propagated to all clients : ", e); - } - } - } - if (null != pair.getRight()) { - logger.info("Closing dl thrift server."); - pair.getRight().close(); - logger.info("Closed dl thrift server."); - } - } - - /** - * Close the server. - */ - public void close() { - if (null != announcer) { - try { - announcer.unannounce(); - } catch (IOException e) { - logger.warn("Error on unannouncing service : ", e); - } - announcer.close(); - } - closeServer(Pair.of(dlService, server), gracefulShutdownMs, TimeUnit.MILLISECONDS); - routingService.stopService(); - if (null != statsProvider) { - statsProvider.stop(); - } - SchedulerUtils.shutdownScheduler(configExecutorService, 60, TimeUnit.SECONDS); - keepAliveLatch.countDown(); - } - - public void join() throws InterruptedException { - keepAliveLatch.await(); - } - - /** - * Running distributedlog server. - * - * @param uri distributedlog namespace - * @param conf distributedlog configuration file location - * @param streamConf per stream configuration dir location - * @param port listen port - * @param statsPort stats port - * @param shardId shard id - * @param announceServerSet whether to announce itself to server set - * @param thriftmux flag to enable thrift mux - * @param statsReceiver receiver to receive finagle stats - * @param statsProvider provider to receive dl stats - * @return distributedlog server - * @throws ConfigurationException - * @throws IllegalArgumentException - * @throws IOException - * @throws ClassNotFoundException - */ - public static DistributedLogServer runServer( - Optional<String> uri, - Optional<String> conf, - Optional<String> streamConf, - Optional<Integer> port, - Optional<Integer> statsPort, - Optional<Integer> shardId, - Optional<Boolean> announceServerSet, - Optional<String> loadAppraiserClass, - Optional<Boolean> thriftmux, - RoutingService routingService, - StatsReceiver statsReceiver, - StatsProvider statsProvider) - throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException { - - final DistributedLogServer server = new DistributedLogServer( - uri, - conf, - streamConf, - port, - statsPort, - shardId, - announceServerSet, - loadAppraiserClass, - thriftmux, - routingService, - statsReceiver, - statsProvider); - - server.runServer(); - return server; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java deleted file mode 100644 index a1642f9..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg; -import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg; -import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.client.routing.RoutingService; -import org.apache.distributedlog.client.routing.RoutingUtils; -import org.apache.distributedlog.client.serverset.DLZkServerSet; -import com.twitter.finagle.stats.NullStatsReceiver; -import com.twitter.finagle.stats.StatsReceiver; -import java.io.File; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URI; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import org.apache.bookkeeper.stats.NullStatsProvider; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.configuration.ConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The launcher of the distributedlog proxy server. - */ -public class DistributedLogServerApp { - - private static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class); - - private static final String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]"; - private final String[] args; - private final Options options = new Options(); - - private DistributedLogServerApp(String[] args) { - this.args = args; - - // prepare options - options.addOption("u", "uri", true, "DistributedLog URI"); - options.addOption("c", "conf", true, "DistributedLog Configuration File"); - options.addOption("sc", "stream-conf", true, "Per Stream Configuration Directory"); - options.addOption("p", "port", true, "DistributedLog Server Port"); - options.addOption("sp", "stats-port", true, "DistributedLog Stats Port"); - options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider"); - options.addOption("si", "shard-id", true, "DistributedLog Shard ID"); - options.addOption("a", "announce", false, "ServerSet Path to Announce"); - options.addOption("la", "load-appraiser", true, "LoadAppraiser Implementation to Use"); - options.addOption("mx", "thriftmux", false, "Is thriftmux enabled"); - } - - private void printUsage() { - HelpFormatter helpFormatter = new HelpFormatter(); - helpFormatter.printHelp(USAGE, options); - } - - private void run() { - try { - logger.info("Running distributedlog server : args = {}", Arrays.toString(args)); - BasicParser parser = new BasicParser(); - CommandLine cmdline = parser.parse(options, args); - runCmd(cmdline); - } catch (ParseException pe) { - logger.error("Argument error : {}", pe.getMessage()); - printUsage(); - Runtime.getRuntime().exit(-1); - } catch (IllegalArgumentException iae) { - logger.error("Argument error : {}", iae.getMessage()); - printUsage(); - Runtime.getRuntime().exit(-1); - } catch (ConfigurationException ce) { - logger.error("Configuration error : {}", ce.getMessage()); - printUsage(); - Runtime.getRuntime().exit(-1); - } catch (IOException ie) { - logger.error("Failed to start distributedlog server : ", ie); - Runtime.getRuntime().exit(-1); - } catch (ClassNotFoundException cnf) { - logger.error("Failed to start distributedlog server : ", cnf); - Runtime.getRuntime().exit(-1); - } - } - - private void runCmd(CommandLine cmdline) - throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException { - final StatsReceiver statsReceiver = NullStatsReceiver.get(); - Optional<String> confOptional = getOptionalStringArg(cmdline, "c"); - DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); - if (confOptional.isPresent()) { - String configFile = confOptional.get(); - try { - dlConf.loadConf(new File(configFile).toURI().toURL()); - } catch (ConfigurationException e) { - throw new IllegalArgumentException("Failed to load distributedlog configuration from " - + configFile + "."); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed " - + configFile + "."); - } - } - // load the stats provider - final StatsProvider statsProvider = getOptionalStringArg(cmdline, "pd") - .transform(new Function<String, StatsProvider>() { - @Nullable - @Override - public StatsProvider apply(@Nullable String name) { - return ReflectionUtils.newInstance(name, StatsProvider.class); - } - }).or(new NullStatsProvider()); - - final Optional<String> uriOption = getOptionalStringArg(cmdline, "u"); - checkArgument(uriOption.isPresent(), "No distributedlog uri provided."); - URI dlUri = URI.create(uriOption.get()); - - DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60)); - RoutingService routingService = RoutingUtils.buildRoutingService(serverSet.getServerSet()) - .statsReceiver(statsReceiver.scope("routing")) - .build(); - - final DistributedLogServer server = DistributedLogServer.runServer( - uriOption, - confOptional, - getOptionalStringArg(cmdline, "sc"), - getOptionalIntegerArg(cmdline, "p"), - getOptionalIntegerArg(cmdline, "sp"), - getOptionalIntegerArg(cmdline, "si"), - getOptionalBooleanArg(cmdline, "a"), - getOptionalStringArg(cmdline, "la"), - getOptionalBooleanArg(cmdline, "mx"), - routingService, - statsReceiver, - statsProvider); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - logger.info("Closing DistributedLog Server."); - server.close(); - logger.info("Closed DistributedLog Server."); - statsProvider.stop(); - } - }); - - try { - server.join(); - } catch (InterruptedException e) { - logger.warn("Interrupted when waiting distributedlog server to be finished : ", e); - } - - logger.info("DistributedLog Service Interrupted."); - server.close(); - logger.info("Closed DistributedLog Server."); - statsProvider.stop(); - } - - public static void main(String[] args) { - final DistributedLogServerApp launcher = new DistributedLogServerApp(args); - launcher.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java deleted file mode 100644 index c37cd53..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java +++ /dev/null @@ -1,794 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.common.net.InetSocketAddressHelper; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.acl.AccessControlManager; -import org.apache.distributedlog.client.resolver.DefaultRegionResolver; -import org.apache.distributedlog.client.resolver.RegionResolver; -import org.apache.distributedlog.client.routing.RoutingService; -import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.RegionUnavailableException; -import org.apache.distributedlog.exceptions.ServiceUnavailableException; -import org.apache.distributedlog.exceptions.StreamUnavailableException; -import org.apache.distributedlog.exceptions.TooManyStreamsException; -import org.apache.distributedlog.feature.AbstractFeatureProvider; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import org.apache.distributedlog.rate.MovingAverageRate; -import org.apache.distributedlog.rate.MovingAverageRateFactory; -import org.apache.distributedlog.service.config.ServerConfiguration; -import org.apache.distributedlog.service.config.StreamConfigProvider; -import org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy; -import org.apache.distributedlog.service.placement.LoadAppraiser; -import org.apache.distributedlog.service.placement.PlacementPolicy; -import org.apache.distributedlog.service.placement.ZKPlacementStateManager; -import org.apache.distributedlog.service.stream.BulkWriteOp; -import org.apache.distributedlog.service.stream.DeleteOp; -import org.apache.distributedlog.service.stream.HeartbeatOp; -import org.apache.distributedlog.service.stream.ReleaseOp; -import org.apache.distributedlog.service.stream.Stream; -import org.apache.distributedlog.service.stream.StreamFactory; -import org.apache.distributedlog.service.stream.StreamFactoryImpl; -import org.apache.distributedlog.service.stream.StreamManager; -import org.apache.distributedlog.service.stream.StreamManagerImpl; -import org.apache.distributedlog.service.stream.StreamOp; -import org.apache.distributedlog.service.stream.StreamOpStats; -import org.apache.distributedlog.service.stream.TruncateOp; -import org.apache.distributedlog.service.stream.WriteOp; -import org.apache.distributedlog.service.stream.WriteOpWithPayload; -import org.apache.distributedlog.service.stream.admin.CreateOp; -import org.apache.distributedlog.service.stream.admin.StreamAdminOp; -import org.apache.distributedlog.service.stream.limiter.ServiceRequestLimiter; -import org.apache.distributedlog.service.streamset.StreamPartitionConverter; -import org.apache.distributedlog.service.utils.ServerUtils; -import org.apache.distributedlog.thrift.service.BulkWriteResponse; -import org.apache.distributedlog.thrift.service.ClientInfo; -import org.apache.distributedlog.thrift.service.DistributedLogService; -import org.apache.distributedlog.thrift.service.HeartbeatOptions; -import org.apache.distributedlog.thrift.service.ResponseHeader; -import org.apache.distributedlog.thrift.service.ServerInfo; -import org.apache.distributedlog.thrift.service.ServerStatus; -import org.apache.distributedlog.thrift.service.StatusCode; -import org.apache.distributedlog.thrift.service.WriteContext; -import org.apache.distributedlog.thrift.service.WriteResponse; -import org.apache.distributedlog.util.ConfUtils; -import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.SchedulerUtils; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Function; -import com.twitter.util.Function0; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.ScheduledThreadPoolTimer; -import com.twitter.util.Timer; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.StatsLogger; -import org.jboss.netty.util.HashedWheelTimer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; - -/** - * Implementation of distributedlog thrift service. - */ -public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface, - FatalErrorHandler { - - private static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class); - - private static final int MOVING_AVERAGE_WINDOW_SECS = 60; - - private final ServerConfiguration serverConfig; - private final DistributedLogConfiguration dlConfig; - private final DistributedLogNamespace dlNamespace; - private final int serverRegionId; - private final PlacementPolicy placementPolicy; - private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT; - private final ReentrantReadWriteLock closeLock = - new ReentrantReadWriteLock(); - private final CountDownLatch keepAliveLatch; - private final byte dlsnVersion; - private final String clientId; - private final OrderedScheduler scheduler; - private final AccessControlManager accessControlManager; - private final StreamConfigProvider streamConfigProvider; - private final StreamManager streamManager; - private final StreamFactory streamFactory; - private final RoutingService routingService; - private final RegionResolver regionResolver; - private final MovingAverageRateFactory movingAvgFactory; - private final MovingAverageRate windowedRps; - private final MovingAverageRate windowedBps; - private final ServiceRequestLimiter limiter; - private final Timer timer; - private final HashedWheelTimer requestTimer; - - // Features - private final FeatureProvider featureProvider; - private final Feature featureRegionStopAcceptNewStream; - private final Feature featureChecksumDisabled; - private final Feature limiterDisabledFeature; - - // Stats - private final StatsLogger statsLogger; - private final StatsLogger perStreamStatsLogger; - private final StreamPartitionConverter streamPartitionConverter; - private final StreamOpStats streamOpStats; - private final Counter bulkWritePendingStat; - private final Counter writePendingStat; - private final Counter redirects; - private final Counter receivedRecordCounter; - private final StatsLogger statusCodeStatLogger; - private final ConcurrentHashMap<StatusCode, Counter> statusCodeCounters = - new ConcurrentHashMap<StatusCode, Counter>(); - private final Counter statusCodeTotal; - private final Gauge<Number> proxyStatusGauge; - private final Gauge<Number> movingAvgRpsGauge; - private final Gauge<Number> movingAvgBpsGauge; - private final Gauge<Number> streamAcquiredGauge; - private final Gauge<Number> streamCachedGauge; - private final int shard; - - DistributedLogServiceImpl(ServerConfiguration serverConf, - DistributedLogConfiguration dlConf, - DynamicDistributedLogConfiguration dynDlConf, - StreamConfigProvider streamConfigProvider, - URI uri, - StreamPartitionConverter converter, - RoutingService routingService, - StatsLogger statsLogger, - StatsLogger perStreamStatsLogger, - CountDownLatch keepAliveLatch, - LoadAppraiser loadAppraiser) - throws IOException { - // Configuration. - this.serverConfig = serverConf; - this.dlConfig = dlConf; - this.perStreamStatsLogger = perStreamStatsLogger; - this.dlsnVersion = serverConf.getDlsnVersion(); - this.serverRegionId = serverConf.getRegionId(); - this.streamPartitionConverter = converter; - int serverPort = serverConf.getServerPort(); - this.shard = serverConf.getServerShardId(); - int numThreads = serverConf.getServerThreads(); - this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), shard); - String allocatorPoolName = ServerUtils.getLedgerAllocatorPoolName( - serverRegionId, - shard, - serverConf.isUseHostnameAsAllocatorPoolName()); - dlConf.setLedgerAllocatorPoolName(allocatorPoolName); - this.featureProvider = AbstractFeatureProvider.getFeatureProvider("", dlConf, statsLogger.scope("features")); - if (this.featureProvider instanceof AbstractFeatureProvider) { - ((AbstractFeatureProvider) featureProvider).start(); - } - - // Build the namespace - this.dlNamespace = DistributedLogNamespaceBuilder.newBuilder() - .conf(dlConf) - .uri(uri) - .statsLogger(statsLogger) - .featureProvider(this.featureProvider) - .clientId(clientId) - .regionId(serverRegionId) - .build(); - this.accessControlManager = this.dlNamespace.createAccessControlManager(); - this.keepAliveLatch = keepAliveLatch; - this.streamConfigProvider = streamConfigProvider; - - // Stats pertaining to stream op execution - this.streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); - - // Executor Service. - this.scheduler = OrderedScheduler.newBuilder() - .corePoolSize(numThreads) - .name("DistributedLogService-Executor") - .traceTaskExecution(true) - .statsLogger(statsLogger.scope("scheduler")) - .build(); - - // Timer, kept separate to ensure reliability of timeouts. - this.requestTimer = new HashedWheelTimer( - new ThreadFactoryBuilder().setNameFormat("DLServiceTimer-%d").build(), - dlConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, - dlConf.getTimeoutTimerNumTicks()); - - // Creating and managing Streams - this.streamFactory = new StreamFactoryImpl(clientId, - streamOpStats, - serverConf, - dlConf, - featureProvider, - streamConfigProvider, - converter, - dlNamespace, - scheduler, - this, - requestTimer); - this.streamManager = new StreamManagerImpl( - clientId, - dlConf, - scheduler, - streamFactory, - converter, - streamConfigProvider, - dlNamespace); - this.routingService = routingService; - this.regionResolver = new DefaultRegionResolver(); - - // Service features - this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature( - ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase()); - this.featureChecksumDisabled = this.featureProvider.getFeature( - ServerFeatureKeys.SERVICE_CHECKSUM_DISABLED.name().toLowerCase()); - this.limiterDisabledFeature = this.featureProvider.getFeature( - ServerFeatureKeys.SERVICE_GLOBAL_LIMITER_DISABLED.name().toLowerCase()); - - // Resource limiting - this.timer = new ScheduledThreadPoolTimer(1, "timer", true); - this.movingAvgFactory = new MovingAverageRateFactory(timer); - this.windowedRps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS); - this.windowedBps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS); - this.limiter = new ServiceRequestLimiter( - dynDlConf, - streamOpStats.baseScope("service_limiter"), - windowedRps, - windowedBps, - streamManager, - limiterDisabledFeature); - - this.placementPolicy = new LeastLoadPlacementPolicy( - loadAppraiser, - routingService, - dlNamespace, - new ZKPlacementStateManager(uri, dlConf, statsLogger), - Duration.fromSeconds(serverConf.getResourcePlacementRefreshInterval()), - statsLogger); - logger.info("placement started"); - - // Stats - this.statsLogger = statsLogger; - - // Gauges for server status/health - this.proxyStatusGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable() - ? 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2)); - } - }; - this.movingAvgRpsGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return windowedRps.get(); - } - }; - this.movingAvgBpsGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return windowedBps.get(); - } - }; - // Gauges for streams - this.streamAcquiredGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return streamManager.numAcquired(); - } - }; - this.streamCachedGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return streamManager.numCached(); - } - }; - - // Stats on server - statsLogger.registerGauge("proxy_status", proxyStatusGauge); - // Global moving average rps - statsLogger.registerGauge("moving_avg_rps", movingAvgRpsGauge); - // Global moving average bps - statsLogger.registerGauge("moving_avg_bps", movingAvgBpsGauge); - // Stats on requests - this.bulkWritePendingStat = streamOpStats.requestPendingCounter("bulkWritePending"); - this.writePendingStat = streamOpStats.requestPendingCounter("writePending"); - this.redirects = streamOpStats.requestCounter("redirect"); - this.statusCodeStatLogger = streamOpStats.requestScope("statuscode"); - this.statusCodeTotal = streamOpStats.requestCounter("statuscode_count"); - this.receivedRecordCounter = streamOpStats.recordsCounter("received"); - - // Stats for streams - StatsLogger streamsStatsLogger = statsLogger.scope("streams"); - streamsStatsLogger.registerGauge("acquired", this.streamAcquiredGauge); - streamsStatsLogger.registerGauge("cached", this.streamCachedGauge); - - // Setup complete - logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {}," - + " dlsn version {}.", - new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion }); - } - - private void countStatusCode(StatusCode code) { - Counter counter = statusCodeCounters.get(code); - if (null == counter) { - counter = statusCodeStatLogger.getCounter(code.name()); - Counter oldCounter = statusCodeCounters.putIfAbsent(code, counter); - if (null != oldCounter) { - counter = oldCounter; - } - } - counter.inc(); - statusCodeTotal.inc(); - } - - @Override - public Future<ServerInfo> handshake() { - return handshakeWithClientInfo(new ClientInfo()); - } - - @Override - public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) { - ServerInfo serverInfo = new ServerInfo(); - closeLock.readLock().lock(); - try { - serverInfo.setServerStatus(serverStatus); - } finally { - closeLock.readLock().unlock(); - } - - if (clientInfo.isSetGetOwnerships() && !clientInfo.isGetOwnerships()) { - return Future.value(serverInfo); - } - - Optional<String> regex = Optional.absent(); - if (clientInfo.isSetStreamNameRegex()) { - regex = Optional.of(clientInfo.getStreamNameRegex()); - } - - Map<String, String> ownershipMap = streamManager.getStreamOwnershipMap(regex); - serverInfo.setOwnerships(ownershipMap); - return Future.value(serverInfo); - } - - @VisibleForTesting - Stream getLogWriter(String stream) throws IOException { - Stream writer = streamManager.getStream(stream); - if (null == writer) { - closeLock.readLock().lock(); - try { - if (featureRegionStopAcceptNewStream.isAvailable()) { - // accept new stream is disabled in current dc - throw new RegionUnavailableException("Region is unavailable right now."); - } else if (!(ServerStatus.WRITE_AND_ACCEPT == serverStatus)) { - // if it is closed, we would not acquire stream again. - return null; - } - writer = streamManager.getOrCreateStream(stream, true); - } finally { - closeLock.readLock().unlock(); - } - } - return writer; - } - - // Service interface methods - - @Override - public Future<WriteResponse> write(final String stream, ByteBuffer data) { - receivedRecordCounter.inc(); - return doWrite(stream, data, null /* checksum */, false); - } - - @Override - public Future<BulkWriteResponse> writeBulkWithContext(final String stream, - List<ByteBuffer> data, - WriteContext ctx) { - bulkWritePendingStat.inc(); - receivedRecordCounter.add(data.size()); - BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter, - getChecksum(ctx), featureChecksumDisabled, accessControlManager); - executeStreamOp(op); - return op.result().ensure(new Function0<BoxedUnit>() { - public BoxedUnit apply() { - bulkWritePendingStat.dec(); - return null; - } - }); - } - - @Override - public Future<WriteResponse> writeWithContext(final String stream, ByteBuffer data, WriteContext ctx) { - return doWrite(stream, data, getChecksum(ctx), ctx.isIsRecordSet()); - } - - @Override - public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) { - HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx), - featureChecksumDisabled, accessControlManager); - executeStreamOp(op); - return op.result(); - } - - @Override - public Future<WriteResponse> heartbeatWithOptions(String stream, WriteContext ctx, HeartbeatOptions options) { - HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx), - featureChecksumDisabled, accessControlManager); - if (options.isSendHeartBeatToReader()) { - op.setWriteControlRecord(true); - } - executeStreamOp(op); - return op.result(); - } - - @Override - public Future<WriteResponse> truncate(String stream, String dlsn, WriteContext ctx) { - TruncateOp op = new TruncateOp( - stream, - DLSN.deserialize(dlsn), - statsLogger, - perStreamStatsLogger, - getChecksum(ctx), - featureChecksumDisabled, - accessControlManager); - executeStreamOp(op); - return op.result(); - } - - @Override - public Future<WriteResponse> delete(String stream, WriteContext ctx) { - DeleteOp op = new DeleteOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx), - featureChecksumDisabled, accessControlManager); - executeStreamOp(op); - return op.result(); - } - - @Override - public Future<WriteResponse> release(String stream, WriteContext ctx) { - ReleaseOp op = new ReleaseOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx), - featureChecksumDisabled, accessControlManager); - executeStreamOp(op); - return op.result(); - } - - @Override - public Future<WriteResponse> create(String stream, WriteContext ctx) { - CreateOp op = new CreateOp(stream, statsLogger, streamManager, getChecksum(ctx), featureChecksumDisabled); - return executeStreamAdminOp(op); - } - - // - // Ownership RPC - // - - @Override - public Future<WriteResponse> getOwner(String streamName, WriteContext ctx) { - if (streamManager.isAcquired(streamName)) { - // the stream is already acquired - return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId))); - } - - return placementPolicy.placeStream(streamName).map(new Function<String, WriteResponse>() { - @Override - public WriteResponse apply(String server) { - String host = DLSocketAddress.toLockId(InetSocketAddressHelper.parse(server), -1); - return new WriteResponse(ResponseUtils.ownerToHeader(host)); - } - }); - } - - - // - // Admin RPCs - // - - @Override - public Future<Void> setAcceptNewStream(boolean enabled) { - closeLock.writeLock().lock(); - try { - logger.info("Set AcceptNewStream = {}", enabled); - if (ServerStatus.DOWN != serverStatus) { - if (enabled) { - serverStatus = ServerStatus.WRITE_AND_ACCEPT; - } else { - serverStatus = ServerStatus.WRITE_ONLY; - } - } - } finally { - closeLock.writeLock().unlock(); - } - return Future.Void(); - } - - private Future<WriteResponse> doWrite(final String name, - ByteBuffer data, - Long checksum, - boolean isRecordSet) { - writePendingStat.inc(); - receivedRecordCounter.inc(); - WriteOp op = newWriteOp(name, data, checksum, isRecordSet); - executeStreamOp(op); - return op.result().ensure(new Function0<BoxedUnit>() { - public BoxedUnit apply() { - writePendingStat.dec(); - return null; - } - }); - } - - private Long getChecksum(WriteContext ctx) { - return ctx.isSetCrc32() ? ctx.getCrc32() : null; - } - - private Future<WriteResponse> executeStreamAdminOp(final StreamAdminOp op) { - try { - op.preExecute(); - } catch (DLException dle) { - return Future.exception(dle); - } - return op.execute(); - } - - private void executeStreamOp(final StreamOp op) { - - // Must attach this as early as possible--returning before this point will cause us to - // lose the status code. - op.responseHeader().addEventListener(new FutureEventListener<ResponseHeader>() { - @Override - public void onSuccess(ResponseHeader header) { - if (header.getLocation() != null || header.getCode() == StatusCode.FOUND) { - redirects.inc(); - } - countStatusCode(header.getCode()); - } - @Override - public void onFailure(Throwable cause) { - } - }); - - try { - // Apply the request limiter - limiter.apply(op); - - // Execute per-op pre-exec code - op.preExecute(); - - } catch (TooManyStreamsException e) { - // Translate to StreamUnavailableException to ensure that the client will redirect - // to a different host. Ideally we would be able to return TooManyStreamsException, - // but the way exception handling works right now we can't control the handling in - // the client because client changes deploy very slowly. - op.fail(new StreamUnavailableException(e.getMessage())); - return; - } catch (Exception e) { - op.fail(e); - return; - } - - Stream stream; - try { - stream = getLogWriter(op.streamName()); - } catch (RegionUnavailableException rue) { - // redirect the requests to other region - op.fail(new RegionUnavailableException("Region " + serverRegionId + " is unavailable.")); - return; - } catch (IOException e) { - op.fail(e); - return; - } - if (null == stream) { - // redirect the requests when stream is unavailable. - op.fail(new ServiceUnavailableException("Server " + clientId + " is closed.")); - return; - } - - if (op instanceof WriteOpWithPayload) { - WriteOpWithPayload writeOp = (WriteOpWithPayload) op; - windowedBps.add(writeOp.getPayloadSize()); - windowedRps.inc(); - } - - stream.submit(op); - } - - void shutdown() { - try { - closeLock.writeLock().lock(); - try { - if (ServerStatus.DOWN == serverStatus) { - return; - } - serverStatus = ServerStatus.DOWN; - } finally { - closeLock.writeLock().unlock(); - } - - streamManager.close(); - movingAvgFactory.close(); - limiter.close(); - - Stopwatch closeStreamsStopwatch = Stopwatch.createStarted(); - - Future<List<Void>> closeResult = streamManager.closeStreams(); - logger.info("Waiting for closing all streams ..."); - try { - Await.result(closeResult, Duration.fromTimeUnit(5, TimeUnit.MINUTES)); - logger.info("Closed all streams in {} millis.", - closeStreamsStopwatch.elapsed(TimeUnit.MILLISECONDS)); - } catch (InterruptedException e) { - logger.warn("Interrupted on waiting for closing all streams : ", e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - logger.warn("Sorry, we didn't close all streams gracefully in 5 minutes : ", e); - } - - // shutdown the dl namespace - logger.info("Closing distributedlog namespace ..."); - dlNamespace.close(); - logger.info("Closed distributedlog namespace ."); - - // Stop the feature provider - if (this.featureProvider instanceof AbstractFeatureProvider) { - ((AbstractFeatureProvider) featureProvider).stop(); - } - - // Stop the timer. - timer.stop(); - placementPolicy.close(); - - // clean up gauge - unregisterGauge(); - - // shutdown the executor after requesting closing streams. - SchedulerUtils.shutdownScheduler(scheduler, 60, TimeUnit.SECONDS); - } catch (Exception ex) { - logger.info("Exception while shutting down distributedlog service."); - } finally { - // release the keepAliveLatch in case shutdown is called from a shutdown hook. - keepAliveLatch.countDown(); - logger.info("Finished shutting down distributedlog service."); - } - } - - protected void startPlacementPolicy() { - this.placementPolicy.start(shard == 0); - } - - @Override - public void notifyFatalError() { - triggerShutdown(); - } - - private void triggerShutdown() { - // release the keepAliveLatch to let the main thread shutdown the whole service. - logger.info("Releasing KeepAlive Latch to trigger shutdown ..."); - keepAliveLatch.countDown(); - logger.info("Released KeepAlive Latch. Main thread will shut the service down."); - } - - // Test methods. - - private DynamicDistributedLogConfiguration getDynConf(String streamName) { - Optional<DynamicDistributedLogConfiguration> dynDlConf = - streamConfigProvider.getDynamicStreamConfig(streamName); - if (dynDlConf.isPresent()) { - return dynDlConf.get(); - } else { - return ConfUtils.getConstDynConf(dlConfig); - } - } - - /** - * clean up the gauge before we close to help GC. - */ - private void unregisterGauge(){ - this.statsLogger.unregisterGauge("proxy_status", this.proxyStatusGauge); - this.statsLogger.unregisterGauge("moving_avg_rps", this.movingAvgRpsGauge); - this.statsLogger.unregisterGauge("moving_avg_bps", this.movingAvgBpsGauge); - this.statsLogger.unregisterGauge("acquired", this.streamAcquiredGauge); - this.statsLogger.unregisterGauge("cached", this.streamCachedGauge); - } - - @VisibleForTesting - Stream newStream(String name) throws IOException { - return streamManager.getOrCreateStream(name, false); - } - - @VisibleForTesting - WriteOp newWriteOp(String stream, ByteBuffer data, Long checksum) { - return newWriteOp(stream, data, checksum, false); - } - - @VisibleForTesting - RoutingService getRoutingService() { - return this.routingService; - } - - @VisibleForTesting - DLSocketAddress getServiceAddress() throws IOException { - return DLSocketAddress.deserialize(clientId); - } - - WriteOp newWriteOp(String stream, - ByteBuffer data, - Long checksum, - boolean isRecordSet) { - return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter, - serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled, - accessControlManager); - } - - @VisibleForTesting - Future<List<Void>> closeStreams() { - return streamManager.closeStreams(); - } - - @VisibleForTesting - public DistributedLogNamespace getDistributedLogNamespace() { - return dlNamespace; - } - - @VisibleForTesting - StreamManager getStreamManager() { - return streamManager; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java deleted file mode 100644 index 17b5ab3..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -/** - * Implement handling for an unrecoverable error. - */ -public interface FatalErrorHandler { - - /** - * This method is invoked when an unrecoverable error has occurred - * and no progress can be made. It should implement a shutdown routine. - */ - void notifyFatalError(); -}