http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java new file mode 100644 index 0000000..96bc338 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ClientUtils.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import org.apache.distributedlog.client.DistributedLogClientImpl; +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.commons.lang3.tuple.Pair; + +/** + * DistributedLog Client Related Utils. + */ +public class ClientUtils { + + public static Pair<DistributedLogClient, MonitorServiceClient> buildClient(DistributedLogClientBuilder builder) { + DistributedLogClientImpl clientImpl = builder.buildClient(); + return Pair.of((DistributedLogClient) clientImpl, (MonitorServiceClient) clientImpl); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java new file mode 100644 index 0000000..9cc085d --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogCluster.java @@ -0,0 +1,352 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.LocalDLMEmulator; +import org.apache.distributedlog.client.routing.SingleHostRoutingService; +import org.apache.distributedlog.impl.metadata.BKDLConfig; +import org.apache.distributedlog.metadata.DLMetadata; +import org.apache.distributedlog.service.placement.EqualLoadAppraiser; +import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; +import com.twitter.finagle.builder.Server; +import java.io.File; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim; +import org.apache.bookkeeper.stats.NullStatsProvider; +import org.apache.bookkeeper.util.IOUtils; +import org.apache.bookkeeper.util.LocalBookKeeper; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DistributedLog Cluster is an emulator to run distributedlog components. + */ +public class DistributedLogCluster { + + private static final Logger LOG = LoggerFactory.getLogger(DistributedLogCluster.class); + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder to build distributedlog cluster. + */ + public static class Builder { + + int numBookies = 3; + boolean shouldStartZK = true; + String zkHost = "127.0.0.1"; + int zkPort = 0; + boolean shouldStartProxy = true; + int proxyPort = 7000; + boolean thriftmux = false; + DistributedLogConfiguration dlConf = new DistributedLogConfiguration() + .setLockTimeout(10) + .setOutputBufferSize(0) + .setImmediateFlushEnabled(true); + ServerConfiguration bkConf = new ServerConfiguration(); + + private Builder() {} + + /** + * How many bookies to run. By default is 3. + * + * @return builder + */ + public Builder numBookies(int numBookies) { + this.numBookies = numBookies; + return this; + } + + /** + * Whether to start zookeeper? By default is true. + * + * @param startZK + * flag to start zookeeper? + * @return builder + */ + public Builder shouldStartZK(boolean startZK) { + this.shouldStartZK = startZK; + return this; + } + + /** + * ZooKeeper server to run. By default it runs locally on '127.0.0.1'. + * + * @param zkServers + * zk servers + * @return builder + */ + public Builder zkServers(String zkServers) { + this.zkHost = zkServers; + return this; + } + + /** + * ZooKeeper server port to listen on. By default it listens on 2181. + * + * @param zkPort + * zookeeper server port. + * @return builder. + */ + public Builder zkPort(int zkPort) { + this.zkPort = zkPort; + return this; + } + + /** + * Whether to start proxy or not. By default is true. + * + * @param startProxy + * whether to start proxy or not. + * @return builder + */ + public Builder shouldStartProxy(boolean startProxy) { + this.shouldStartProxy = startProxy; + return this; + } + + /** + * Port that proxy server to listen on. By default is 7000. + * + * @param proxyPort + * port that proxy server to listen on. + * @return builder + */ + public Builder proxyPort(int proxyPort) { + this.proxyPort = proxyPort; + return this; + } + + /** + * Set the distributedlog configuration. + * + * @param dlConf + * distributedlog configuration + * @return builder + */ + public Builder dlConf(DistributedLogConfiguration dlConf) { + this.dlConf = dlConf; + return this; + } + + /** + * Set the Bookkeeper server configuration. + * + * @param bkConf + * bookkeeper server configuration + * @return builder + */ + public Builder bkConf(ServerConfiguration bkConf) { + this.bkConf = bkConf; + return this; + } + + /** + * Enable thriftmux for the dl server. + * + * @param enabled flag to enable thriftmux + * @return builder + */ + public Builder thriftmux(boolean enabled) { + this.thriftmux = enabled; + return this; + } + + public DistributedLogCluster build() throws Exception { + // build the cluster + return new DistributedLogCluster( + dlConf, + bkConf, + numBookies, + shouldStartZK, + zkHost, + zkPort, + shouldStartProxy, + proxyPort, + thriftmux); + } + } + + /** + * Run a distributedlog proxy server. + */ + public static class DLServer { + + static final int MAX_RETRIES = 20; + static final int MIN_PORT = 1025; + static final int MAX_PORT = 65535; + + int proxyPort; + + public final InetSocketAddress address; + public final Pair<DistributedLogServiceImpl, Server> dlServer; + private final SingleHostRoutingService routingService = SingleHostRoutingService.of(null); + + protected DLServer(DistributedLogConfiguration dlConf, + URI uri, + int basePort, + boolean thriftmux) throws Exception { + proxyPort = basePort; + + boolean success = false; + int retries = 0; + Pair<DistributedLogServiceImpl, Server> serverPair = null; + while (!success) { + try { + org.apache.distributedlog.service.config.ServerConfiguration serverConf = + new org.apache.distributedlog.service.config.ServerConfiguration(); + serverConf.loadConf(dlConf); + serverConf.setServerShardId(proxyPort); + serverPair = DistributedLogServer.runServer( + serverConf, + dlConf, + uri, + new IdentityStreamPartitionConverter(), + routingService, + new NullStatsProvider(), + proxyPort, + thriftmux, + new EqualLoadAppraiser()); + routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort)); + routingService.startService(); + serverPair.getLeft().startPlacementPolicy(); + success = true; + } catch (BindException be) { + retries++; + if (retries > MAX_RETRIES) { + throw be; + } + proxyPort++; + if (proxyPort > MAX_PORT) { + proxyPort = MIN_PORT; + } + } + } + + LOG.info("Running DL on port {}", proxyPort); + + dlServer = serverPair; + address = DLSocketAddress.getSocketAddress(proxyPort); + } + + public InetSocketAddress getAddress() { + return address; + } + + public void shutdown() { + DistributedLogServer.closeServer(dlServer, 0, TimeUnit.MILLISECONDS); + routingService.stopService(); + } + } + + private final DistributedLogConfiguration dlConf; + private final ZooKeeperServerShim zks; + private final LocalDLMEmulator dlmEmulator; + private DLServer dlServer; + private final boolean shouldStartProxy; + private final int proxyPort; + private final boolean thriftmux; + private final List<File> tmpDirs = new ArrayList<File>(); + + private DistributedLogCluster(DistributedLogConfiguration dlConf, + ServerConfiguration bkConf, + int numBookies, + boolean shouldStartZK, + String zkServers, + int zkPort, + boolean shouldStartProxy, + int proxyPort, + boolean thriftmux) throws Exception { + this.dlConf = dlConf; + if (shouldStartZK) { + File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog"); + tmpDirs.add(zkTmpDir); + if (0 == zkPort) { + Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir); + this.zks = serverAndPort.getLeft(); + zkPort = serverAndPort.getRight(); + } else { + this.zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir); + } + } else { + this.zks = null; + } + this.dlmEmulator = LocalDLMEmulator.newBuilder() + .numBookies(numBookies) + .zkHost(zkServers) + .zkPort(zkPort) + .serverConf(bkConf) + .shouldStartZK(false) + .build(); + this.shouldStartProxy = shouldStartProxy; + this.proxyPort = proxyPort; + this.thriftmux = thriftmux; + } + + public void start() throws Exception { + this.dlmEmulator.start(); + BKDLConfig bkdlConfig = new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl"); + DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri()); + if (shouldStartProxy) { + this.dlServer = new DLServer( + dlConf, + this.dlmEmulator.getUri(), + proxyPort, + thriftmux); + } else { + this.dlServer = null; + } + } + + public void stop() throws Exception { + if (null != dlServer) { + this.dlServer.shutdown(); + } + this.dlmEmulator.teardown(); + if (null != this.zks) { + this.zks.stop(); + } + for (File dir : tmpDirs) { + FileUtils.forceDeleteOnExit(dir); + } + } + + public URI getUri() { + return this.dlmEmulator.getUri(); + } + + public String getZkServers() { + return this.dlmEmulator.getZkServers(); + } + + public String getProxyFinagleStr() { + return "inet!" + (dlServer == null ? "127.0.0.1:" + proxyPort : dlServer.getAddress().toString()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java new file mode 100644 index 0000000..81e476b --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.client.routing.RoutingService; +import org.apache.distributedlog.config.DynamicConfigurationFactory; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.service.announcer.Announcer; +import org.apache.distributedlog.service.announcer.NOPAnnouncer; +import org.apache.distributedlog.service.announcer.ServerSetAnnouncer; +import org.apache.distributedlog.service.config.DefaultStreamConfigProvider; +import org.apache.distributedlog.service.config.NullStreamConfigProvider; +import org.apache.distributedlog.service.config.ServerConfiguration; +import org.apache.distributedlog.service.config.ServiceStreamConfigProvider; +import org.apache.distributedlog.service.config.StreamConfigProvider; +import org.apache.distributedlog.service.placement.EqualLoadAppraiser; +import org.apache.distributedlog.service.placement.LoadAppraiser; +import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import org.apache.distributedlog.thrift.service.DistributedLogService; +import org.apache.distributedlog.util.ConfUtils; +import org.apache.distributedlog.util.SchedulerUtils; +import com.twitter.finagle.Stack; +import com.twitter.finagle.ThriftMuxServer$; +import com.twitter.finagle.builder.Server; +import com.twitter.finagle.builder.ServerBuilder; +import com.twitter.finagle.stats.NullStatsReceiver; +import com.twitter.finagle.stats.StatsReceiver; +import com.twitter.finagle.thrift.ClientIdRequiredFilter; +import com.twitter.finagle.thrift.ThriftServerFramedCodec; +import com.twitter.finagle.transport.Transport; +import com.twitter.util.Duration; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; + +/** + * Running the distributedlog proxy server. + */ +public class DistributedLogServer { + + private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class); + private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName(); + + private DistributedLogServiceImpl dlService = null; + private Server server = null; + private RoutingService routingService; + private StatsProvider statsProvider; + private Announcer announcer = null; + private ScheduledExecutorService configExecutorService; + private long gracefulShutdownMs = 0L; + + private final StatsReceiver statsReceiver; + private final CountDownLatch keepAliveLatch = new CountDownLatch(1); + private final Optional<String> uri; + private final Optional<String> conf; + private final Optional<String> streamConf; + private final Optional<Integer> port; + private final Optional<Integer> statsPort; + private final Optional<Integer> shardId; + private final Optional<Boolean> announceServerSet; + private final Optional<String> loadAppraiserClassStr; + private final Optional<Boolean> thriftmux; + + DistributedLogServer(Optional<String> uri, + Optional<String> conf, + Optional<String> streamConf, + Optional<Integer> port, + Optional<Integer> statsPort, + Optional<Integer> shardId, + Optional<Boolean> announceServerSet, + Optional<String> loadAppraiserClass, + Optional<Boolean> thriftmux, + RoutingService routingService, + StatsReceiver statsReceiver, + StatsProvider statsProvider) { + this.uri = uri; + this.conf = conf; + this.streamConf = streamConf; + this.port = port; + this.statsPort = statsPort; + this.shardId = shardId; + this.announceServerSet = announceServerSet; + this.thriftmux = thriftmux; + this.routingService = routingService; + this.statsReceiver = statsReceiver; + this.statsProvider = statsProvider; + this.loadAppraiserClassStr = loadAppraiserClass; + } + + public void runServer() + throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException { + if (!uri.isPresent()) { + throw new IllegalArgumentException("No distributedlog uri provided."); + } + URI dlUri = URI.create(uri.get()); + DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); + if (conf.isPresent()) { + String configFile = conf.get(); + try { + dlConf.loadConf(new File(configFile).toURI().toURL()); + } catch (ConfigurationException e) { + throw new IllegalArgumentException("Failed to load distributedlog configuration from " + + configFile + "."); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed " + + configFile + "."); + } + } + + this.configExecutorService = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat("DistributedLogService-Dyncfg-%d") + .setDaemon(true) + .build()); + + // server configuration and dynamic configuration + ServerConfiguration serverConf = new ServerConfiguration(); + serverConf.loadConf(dlConf); + + // overwrite the shard id if it is provided in the args + if (shardId.isPresent()) { + serverConf.setServerShardId(shardId.get()); + } + + serverConf.validate(); + + DynamicDistributedLogConfiguration dynDlConf = getServiceDynConf(dlConf); + + logger.info("Starting stats provider : {}", statsProvider.getClass()); + statsProvider.start(dlConf); + + if (announceServerSet.isPresent() && announceServerSet.get()) { + announcer = new ServerSetAnnouncer( + dlUri, + port.or(0), + statsPort.or(0), + shardId.or(0)); + } else { + announcer = new NOPAnnouncer(); + } + + // Build the stream partition converter + StreamPartitionConverter converter; + try { + converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass()); + } catch (ConfigurationException e) { + logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}", + IdentityStreamPartitionConverter.class.getName()); + converter = new IdentityStreamPartitionConverter(); + } + Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER)); + LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass); + logger.info("Load appraiser class is " + loadAppraiserClassStr.or("not specified.") + " Instantiated " + + loadAppraiser.getClass().getCanonicalName()); + + StreamConfigProvider streamConfProvider = + getStreamConfigProvider(dlConf, converter); + + // pre-run + preRun(dlConf, serverConf); + + Pair<DistributedLogServiceImpl, Server> serverPair = runServer( + serverConf, + dlConf, + dynDlConf, + dlUri, + converter, + routingService, + statsProvider, + port.or(0), + keepAliveLatch, + statsReceiver, + thriftmux.isPresent(), + streamConfProvider, + loadAppraiser); + + this.dlService = serverPair.getLeft(); + this.server = serverPair.getRight(); + + // announce the service + announcer.announce(); + // start the routing service after announced + routingService.startService(); + logger.info("Started the routing service."); + dlService.startPlacementPolicy(); + logger.info("Started the placement policy."); + } + + protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) { + this.gracefulShutdownMs = serverConf.getGracefulShutdownPeriodMs(); + if (!serverConf.isDurableWriteEnabled()) { + conf.setDurableWriteEnabled(false); + } + } + + private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf) + throws ConfigurationException { + Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent(); + if (conf.isPresent()) { + DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory( + configExecutorService, dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS); + dynConf = configFactory.getDynamicConfiguration(conf.get()); + } + if (dynConf.isPresent()) { + return dynConf.get(); + } else { + return ConfUtils.getConstDynConf(dlConf); + } + } + + private StreamConfigProvider getStreamConfigProvider(DistributedLogConfiguration dlConf, + StreamPartitionConverter partitionConverter) + throws ConfigurationException { + StreamConfigProvider streamConfProvider = new NullStreamConfigProvider(); + if (streamConf.isPresent() && conf.isPresent()) { + String dynConfigPath = streamConf.get(); + String defaultConfigFile = conf.get(); + streamConfProvider = new ServiceStreamConfigProvider( + dynConfigPath, + defaultConfigFile, + partitionConverter, + configExecutorService, + dlConf.getDynamicConfigReloadIntervalSec(), + TimeUnit.SECONDS); + } else if (conf.isPresent()) { + String configFile = conf.get(); + streamConfProvider = new DefaultStreamConfigProvider(configFile, configExecutorService, + dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS); + } + return streamConfProvider; + } + + static Pair<DistributedLogServiceImpl, Server> runServer( + ServerConfiguration serverConf, + DistributedLogConfiguration dlConf, + URI dlUri, + StreamPartitionConverter converter, + RoutingService routingService, + StatsProvider provider, + int port, + boolean thriftmux, + LoadAppraiser loadAppraiser) throws IOException { + + return runServer(serverConf, + dlConf, + ConfUtils.getConstDynConf(dlConf), + dlUri, + converter, + routingService, + provider, + port, + new CountDownLatch(0), + new NullStatsReceiver(), + thriftmux, + new NullStreamConfigProvider(), + loadAppraiser); + } + + static Pair<DistributedLogServiceImpl, Server> runServer( + ServerConfiguration serverConf, + DistributedLogConfiguration dlConf, + DynamicDistributedLogConfiguration dynDlConf, + URI dlUri, + StreamPartitionConverter partitionConverter, + RoutingService routingService, + StatsProvider provider, + int port, + CountDownLatch keepAliveLatch, + StatsReceiver statsReceiver, + boolean thriftmux, + StreamConfigProvider streamConfProvider, + LoadAppraiser loadAppraiser) throws IOException { + logger.info("Running server @ uri {}.", dlUri); + + boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled(); + StatsLogger perStreamStatsLogger; + if (perStreamStatsEnabled) { + perStreamStatsLogger = provider.getStatsLogger("stream"); + } else { + perStreamStatsLogger = NullStatsLogger.INSTANCE; + } + + // dl service + DistributedLogServiceImpl dlService = new DistributedLogServiceImpl( + serverConf, + dlConf, + dynDlConf, + streamConfProvider, + dlUri, + partitionConverter, + routingService, + provider.getStatsLogger(""), + perStreamStatsLogger, + keepAliveLatch, + loadAppraiser); + + StatsReceiver serviceStatsReceiver = statsReceiver.scope("service"); + StatsLogger serviceStatsLogger = provider.getStatsLogger("service"); + + ServerBuilder serverBuilder = ServerBuilder.get() + .name("DistributedLogServer") + .codec(ThriftServerFramedCodec.get()) + .reportTo(statsReceiver) + .keepAlive(true) + .bindTo(new InetSocketAddress(port)); + + if (thriftmux) { + logger.info("Using thriftmux."); + Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness( + Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk(); + serverBuilder = serverBuilder.stack( + ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2())); + } + + logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString()); + + // starts dl server + Server server = ServerBuilder.safeBuild( + new ClientIdRequiredFilter<byte[], byte[]>(serviceStatsReceiver).andThen( + new StatsFilter<byte[], byte[]>(serviceStatsLogger).andThen( + new DistributedLogService.Service(dlService, new TBinaryProtocol.Factory()))), + serverBuilder); + + logger.info("Started DistributedLog Server."); + return Pair.of(dlService, server); + } + + static void closeServer(Pair<DistributedLogServiceImpl, Server> pair, + long gracefulShutdownPeriod, + TimeUnit timeUnit) { + if (null != pair.getLeft()) { + pair.getLeft().shutdown(); + if (gracefulShutdownPeriod > 0) { + try { + timeUnit.sleep(gracefulShutdownPeriod); + } catch (InterruptedException e) { + logger.info("Interrupted on waiting service shutting down state propagated to all clients : ", e); + } + } + } + if (null != pair.getRight()) { + logger.info("Closing dl thrift server."); + pair.getRight().close(); + logger.info("Closed dl thrift server."); + } + } + + /** + * Close the server. + */ + public void close() { + if (null != announcer) { + try { + announcer.unannounce(); + } catch (IOException e) { + logger.warn("Error on unannouncing service : ", e); + } + announcer.close(); + } + closeServer(Pair.of(dlService, server), gracefulShutdownMs, TimeUnit.MILLISECONDS); + routingService.stopService(); + if (null != statsProvider) { + statsProvider.stop(); + } + SchedulerUtils.shutdownScheduler(configExecutorService, 60, TimeUnit.SECONDS); + keepAliveLatch.countDown(); + } + + public void join() throws InterruptedException { + keepAliveLatch.await(); + } + + /** + * Running distributedlog server. + * + * @param uri distributedlog namespace + * @param conf distributedlog configuration file location + * @param streamConf per stream configuration dir location + * @param port listen port + * @param statsPort stats port + * @param shardId shard id + * @param announceServerSet whether to announce itself to server set + * @param thriftmux flag to enable thrift mux + * @param statsReceiver receiver to receive finagle stats + * @param statsProvider provider to receive dl stats + * @return distributedlog server + * @throws ConfigurationException + * @throws IllegalArgumentException + * @throws IOException + * @throws ClassNotFoundException + */ + public static DistributedLogServer runServer( + Optional<String> uri, + Optional<String> conf, + Optional<String> streamConf, + Optional<Integer> port, + Optional<Integer> statsPort, + Optional<Integer> shardId, + Optional<Boolean> announceServerSet, + Optional<String> loadAppraiserClass, + Optional<Boolean> thriftmux, + RoutingService routingService, + StatsReceiver statsReceiver, + StatsProvider statsProvider) + throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException { + + final DistributedLogServer server = new DistributedLogServer( + uri, + conf, + streamConf, + port, + statsPort, + shardId, + announceServerSet, + loadAppraiserClass, + thriftmux, + routingService, + statsReceiver, + statsProvider); + + server.runServer(); + return server; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java new file mode 100644 index 0000000..a1642f9 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServerApp.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg; +import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg; +import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.client.routing.RoutingService; +import org.apache.distributedlog.client.routing.RoutingUtils; +import org.apache.distributedlog.client.serverset.DLZkServerSet; +import com.twitter.finagle.stats.NullStatsReceiver; +import com.twitter.finagle.stats.StatsReceiver; +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.bookkeeper.stats.NullStatsProvider; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.configuration.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The launcher of the distributedlog proxy server. + */ +public class DistributedLogServerApp { + + private static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class); + + private static final String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]"; + private final String[] args; + private final Options options = new Options(); + + private DistributedLogServerApp(String[] args) { + this.args = args; + + // prepare options + options.addOption("u", "uri", true, "DistributedLog URI"); + options.addOption("c", "conf", true, "DistributedLog Configuration File"); + options.addOption("sc", "stream-conf", true, "Per Stream Configuration Directory"); + options.addOption("p", "port", true, "DistributedLog Server Port"); + options.addOption("sp", "stats-port", true, "DistributedLog Stats Port"); + options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider"); + options.addOption("si", "shard-id", true, "DistributedLog Shard ID"); + options.addOption("a", "announce", false, "ServerSet Path to Announce"); + options.addOption("la", "load-appraiser", true, "LoadAppraiser Implementation to Use"); + options.addOption("mx", "thriftmux", false, "Is thriftmux enabled"); + } + + private void printUsage() { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp(USAGE, options); + } + + private void run() { + try { + logger.info("Running distributedlog server : args = {}", Arrays.toString(args)); + BasicParser parser = new BasicParser(); + CommandLine cmdline = parser.parse(options, args); + runCmd(cmdline); + } catch (ParseException pe) { + logger.error("Argument error : {}", pe.getMessage()); + printUsage(); + Runtime.getRuntime().exit(-1); + } catch (IllegalArgumentException iae) { + logger.error("Argument error : {}", iae.getMessage()); + printUsage(); + Runtime.getRuntime().exit(-1); + } catch (ConfigurationException ce) { + logger.error("Configuration error : {}", ce.getMessage()); + printUsage(); + Runtime.getRuntime().exit(-1); + } catch (IOException ie) { + logger.error("Failed to start distributedlog server : ", ie); + Runtime.getRuntime().exit(-1); + } catch (ClassNotFoundException cnf) { + logger.error("Failed to start distributedlog server : ", cnf); + Runtime.getRuntime().exit(-1); + } + } + + private void runCmd(CommandLine cmdline) + throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException { + final StatsReceiver statsReceiver = NullStatsReceiver.get(); + Optional<String> confOptional = getOptionalStringArg(cmdline, "c"); + DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); + if (confOptional.isPresent()) { + String configFile = confOptional.get(); + try { + dlConf.loadConf(new File(configFile).toURI().toURL()); + } catch (ConfigurationException e) { + throw new IllegalArgumentException("Failed to load distributedlog configuration from " + + configFile + "."); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed " + + configFile + "."); + } + } + // load the stats provider + final StatsProvider statsProvider = getOptionalStringArg(cmdline, "pd") + .transform(new Function<String, StatsProvider>() { + @Nullable + @Override + public StatsProvider apply(@Nullable String name) { + return ReflectionUtils.newInstance(name, StatsProvider.class); + } + }).or(new NullStatsProvider()); + + final Optional<String> uriOption = getOptionalStringArg(cmdline, "u"); + checkArgument(uriOption.isPresent(), "No distributedlog uri provided."); + URI dlUri = URI.create(uriOption.get()); + + DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60)); + RoutingService routingService = RoutingUtils.buildRoutingService(serverSet.getServerSet()) + .statsReceiver(statsReceiver.scope("routing")) + .build(); + + final DistributedLogServer server = DistributedLogServer.runServer( + uriOption, + confOptional, + getOptionalStringArg(cmdline, "sc"), + getOptionalIntegerArg(cmdline, "p"), + getOptionalIntegerArg(cmdline, "sp"), + getOptionalIntegerArg(cmdline, "si"), + getOptionalBooleanArg(cmdline, "a"), + getOptionalStringArg(cmdline, "la"), + getOptionalBooleanArg(cmdline, "mx"), + routingService, + statsReceiver, + statsProvider); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + logger.info("Closing DistributedLog Server."); + server.close(); + logger.info("Closed DistributedLog Server."); + statsProvider.stop(); + } + }); + + try { + server.join(); + } catch (InterruptedException e) { + logger.warn("Interrupted when waiting distributedlog server to be finished : ", e); + } + + logger.info("DistributedLog Service Interrupted."); + server.close(); + logger.info("Closed DistributedLog Server."); + statsProvider.stop(); + } + + public static void main(String[] args) { + final DistributedLogServerApp launcher = new DistributedLogServerApp(args); + launcher.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java new file mode 100644 index 0000000..c37cd53 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java @@ -0,0 +1,794 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.twitter.common.net.InetSocketAddressHelper; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.client.resolver.DefaultRegionResolver; +import org.apache.distributedlog.client.resolver.RegionResolver; +import org.apache.distributedlog.client.routing.RoutingService; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.RegionUnavailableException; +import org.apache.distributedlog.exceptions.ServiceUnavailableException; +import org.apache.distributedlog.exceptions.StreamUnavailableException; +import org.apache.distributedlog.exceptions.TooManyStreamsException; +import org.apache.distributedlog.feature.AbstractFeatureProvider; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.rate.MovingAverageRate; +import org.apache.distributedlog.rate.MovingAverageRateFactory; +import org.apache.distributedlog.service.config.ServerConfiguration; +import org.apache.distributedlog.service.config.StreamConfigProvider; +import org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy; +import org.apache.distributedlog.service.placement.LoadAppraiser; +import org.apache.distributedlog.service.placement.PlacementPolicy; +import org.apache.distributedlog.service.placement.ZKPlacementStateManager; +import org.apache.distributedlog.service.stream.BulkWriteOp; +import org.apache.distributedlog.service.stream.DeleteOp; +import org.apache.distributedlog.service.stream.HeartbeatOp; +import org.apache.distributedlog.service.stream.ReleaseOp; +import org.apache.distributedlog.service.stream.Stream; +import org.apache.distributedlog.service.stream.StreamFactory; +import org.apache.distributedlog.service.stream.StreamFactoryImpl; +import org.apache.distributedlog.service.stream.StreamManager; +import org.apache.distributedlog.service.stream.StreamManagerImpl; +import org.apache.distributedlog.service.stream.StreamOp; +import org.apache.distributedlog.service.stream.StreamOpStats; +import org.apache.distributedlog.service.stream.TruncateOp; +import org.apache.distributedlog.service.stream.WriteOp; +import org.apache.distributedlog.service.stream.WriteOpWithPayload; +import org.apache.distributedlog.service.stream.admin.CreateOp; +import org.apache.distributedlog.service.stream.admin.StreamAdminOp; +import org.apache.distributedlog.service.stream.limiter.ServiceRequestLimiter; +import org.apache.distributedlog.service.streamset.StreamPartitionConverter; +import org.apache.distributedlog.service.utils.ServerUtils; +import org.apache.distributedlog.thrift.service.BulkWriteResponse; +import org.apache.distributedlog.thrift.service.ClientInfo; +import org.apache.distributedlog.thrift.service.DistributedLogService; +import org.apache.distributedlog.thrift.service.HeartbeatOptions; +import org.apache.distributedlog.thrift.service.ResponseHeader; +import org.apache.distributedlog.thrift.service.ServerInfo; +import org.apache.distributedlog.thrift.service.ServerStatus; +import org.apache.distributedlog.thrift.service.StatusCode; +import org.apache.distributedlog.thrift.service.WriteContext; +import org.apache.distributedlog.thrift.service.WriteResponse; +import org.apache.distributedlog.util.ConfUtils; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.SchedulerUtils; +import com.twitter.util.Await; +import com.twitter.util.Duration; +import com.twitter.util.Function; +import com.twitter.util.Function0; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.ScheduledThreadPoolTimer; +import com.twitter.util.Timer; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.StatsLogger; +import org.jboss.netty.util.HashedWheelTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +/** + * Implementation of distributedlog thrift service. + */ +public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface, + FatalErrorHandler { + + private static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class); + + private static final int MOVING_AVERAGE_WINDOW_SECS = 60; + + private final ServerConfiguration serverConfig; + private final DistributedLogConfiguration dlConfig; + private final DistributedLogNamespace dlNamespace; + private final int serverRegionId; + private final PlacementPolicy placementPolicy; + private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT; + private final ReentrantReadWriteLock closeLock = + new ReentrantReadWriteLock(); + private final CountDownLatch keepAliveLatch; + private final byte dlsnVersion; + private final String clientId; + private final OrderedScheduler scheduler; + private final AccessControlManager accessControlManager; + private final StreamConfigProvider streamConfigProvider; + private final StreamManager streamManager; + private final StreamFactory streamFactory; + private final RoutingService routingService; + private final RegionResolver regionResolver; + private final MovingAverageRateFactory movingAvgFactory; + private final MovingAverageRate windowedRps; + private final MovingAverageRate windowedBps; + private final ServiceRequestLimiter limiter; + private final Timer timer; + private final HashedWheelTimer requestTimer; + + // Features + private final FeatureProvider featureProvider; + private final Feature featureRegionStopAcceptNewStream; + private final Feature featureChecksumDisabled; + private final Feature limiterDisabledFeature; + + // Stats + private final StatsLogger statsLogger; + private final StatsLogger perStreamStatsLogger; + private final StreamPartitionConverter streamPartitionConverter; + private final StreamOpStats streamOpStats; + private final Counter bulkWritePendingStat; + private final Counter writePendingStat; + private final Counter redirects; + private final Counter receivedRecordCounter; + private final StatsLogger statusCodeStatLogger; + private final ConcurrentHashMap<StatusCode, Counter> statusCodeCounters = + new ConcurrentHashMap<StatusCode, Counter>(); + private final Counter statusCodeTotal; + private final Gauge<Number> proxyStatusGauge; + private final Gauge<Number> movingAvgRpsGauge; + private final Gauge<Number> movingAvgBpsGauge; + private final Gauge<Number> streamAcquiredGauge; + private final Gauge<Number> streamCachedGauge; + private final int shard; + + DistributedLogServiceImpl(ServerConfiguration serverConf, + DistributedLogConfiguration dlConf, + DynamicDistributedLogConfiguration dynDlConf, + StreamConfigProvider streamConfigProvider, + URI uri, + StreamPartitionConverter converter, + RoutingService routingService, + StatsLogger statsLogger, + StatsLogger perStreamStatsLogger, + CountDownLatch keepAliveLatch, + LoadAppraiser loadAppraiser) + throws IOException { + // Configuration. + this.serverConfig = serverConf; + this.dlConfig = dlConf; + this.perStreamStatsLogger = perStreamStatsLogger; + this.dlsnVersion = serverConf.getDlsnVersion(); + this.serverRegionId = serverConf.getRegionId(); + this.streamPartitionConverter = converter; + int serverPort = serverConf.getServerPort(); + this.shard = serverConf.getServerShardId(); + int numThreads = serverConf.getServerThreads(); + this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), shard); + String allocatorPoolName = ServerUtils.getLedgerAllocatorPoolName( + serverRegionId, + shard, + serverConf.isUseHostnameAsAllocatorPoolName()); + dlConf.setLedgerAllocatorPoolName(allocatorPoolName); + this.featureProvider = AbstractFeatureProvider.getFeatureProvider("", dlConf, statsLogger.scope("features")); + if (this.featureProvider instanceof AbstractFeatureProvider) { + ((AbstractFeatureProvider) featureProvider).start(); + } + + // Build the namespace + this.dlNamespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(dlConf) + .uri(uri) + .statsLogger(statsLogger) + .featureProvider(this.featureProvider) + .clientId(clientId) + .regionId(serverRegionId) + .build(); + this.accessControlManager = this.dlNamespace.createAccessControlManager(); + this.keepAliveLatch = keepAliveLatch; + this.streamConfigProvider = streamConfigProvider; + + // Stats pertaining to stream op execution + this.streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); + + // Executor Service. + this.scheduler = OrderedScheduler.newBuilder() + .corePoolSize(numThreads) + .name("DistributedLogService-Executor") + .traceTaskExecution(true) + .statsLogger(statsLogger.scope("scheduler")) + .build(); + + // Timer, kept separate to ensure reliability of timeouts. + this.requestTimer = new HashedWheelTimer( + new ThreadFactoryBuilder().setNameFormat("DLServiceTimer-%d").build(), + dlConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, + dlConf.getTimeoutTimerNumTicks()); + + // Creating and managing Streams + this.streamFactory = new StreamFactoryImpl(clientId, + streamOpStats, + serverConf, + dlConf, + featureProvider, + streamConfigProvider, + converter, + dlNamespace, + scheduler, + this, + requestTimer); + this.streamManager = new StreamManagerImpl( + clientId, + dlConf, + scheduler, + streamFactory, + converter, + streamConfigProvider, + dlNamespace); + this.routingService = routingService; + this.regionResolver = new DefaultRegionResolver(); + + // Service features + this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature( + ServerFeatureKeys.REGION_STOP_ACCEPT_NEW_STREAM.name().toLowerCase()); + this.featureChecksumDisabled = this.featureProvider.getFeature( + ServerFeatureKeys.SERVICE_CHECKSUM_DISABLED.name().toLowerCase()); + this.limiterDisabledFeature = this.featureProvider.getFeature( + ServerFeatureKeys.SERVICE_GLOBAL_LIMITER_DISABLED.name().toLowerCase()); + + // Resource limiting + this.timer = new ScheduledThreadPoolTimer(1, "timer", true); + this.movingAvgFactory = new MovingAverageRateFactory(timer); + this.windowedRps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS); + this.windowedBps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS); + this.limiter = new ServiceRequestLimiter( + dynDlConf, + streamOpStats.baseScope("service_limiter"), + windowedRps, + windowedBps, + streamManager, + limiterDisabledFeature); + + this.placementPolicy = new LeastLoadPlacementPolicy( + loadAppraiser, + routingService, + dlNamespace, + new ZKPlacementStateManager(uri, dlConf, statsLogger), + Duration.fromSeconds(serverConf.getResourcePlacementRefreshInterval()), + statsLogger); + logger.info("placement started"); + + // Stats + this.statsLogger = statsLogger; + + // Gauges for server status/health + this.proxyStatusGauge = new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable() + ? 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2)); + } + }; + this.movingAvgRpsGauge = new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return windowedRps.get(); + } + }; + this.movingAvgBpsGauge = new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return windowedBps.get(); + } + }; + // Gauges for streams + this.streamAcquiredGauge = new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return streamManager.numAcquired(); + } + }; + this.streamCachedGauge = new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return streamManager.numCached(); + } + }; + + // Stats on server + statsLogger.registerGauge("proxy_status", proxyStatusGauge); + // Global moving average rps + statsLogger.registerGauge("moving_avg_rps", movingAvgRpsGauge); + // Global moving average bps + statsLogger.registerGauge("moving_avg_bps", movingAvgBpsGauge); + // Stats on requests + this.bulkWritePendingStat = streamOpStats.requestPendingCounter("bulkWritePending"); + this.writePendingStat = streamOpStats.requestPendingCounter("writePending"); + this.redirects = streamOpStats.requestCounter("redirect"); + this.statusCodeStatLogger = streamOpStats.requestScope("statuscode"); + this.statusCodeTotal = streamOpStats.requestCounter("statuscode_count"); + this.receivedRecordCounter = streamOpStats.recordsCounter("received"); + + // Stats for streams + StatsLogger streamsStatsLogger = statsLogger.scope("streams"); + streamsStatsLogger.registerGauge("acquired", this.streamAcquiredGauge); + streamsStatsLogger.registerGauge("cached", this.streamCachedGauge); + + // Setup complete + logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {}," + + " dlsn version {}.", + new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion }); + } + + private void countStatusCode(StatusCode code) { + Counter counter = statusCodeCounters.get(code); + if (null == counter) { + counter = statusCodeStatLogger.getCounter(code.name()); + Counter oldCounter = statusCodeCounters.putIfAbsent(code, counter); + if (null != oldCounter) { + counter = oldCounter; + } + } + counter.inc(); + statusCodeTotal.inc(); + } + + @Override + public Future<ServerInfo> handshake() { + return handshakeWithClientInfo(new ClientInfo()); + } + + @Override + public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) { + ServerInfo serverInfo = new ServerInfo(); + closeLock.readLock().lock(); + try { + serverInfo.setServerStatus(serverStatus); + } finally { + closeLock.readLock().unlock(); + } + + if (clientInfo.isSetGetOwnerships() && !clientInfo.isGetOwnerships()) { + return Future.value(serverInfo); + } + + Optional<String> regex = Optional.absent(); + if (clientInfo.isSetStreamNameRegex()) { + regex = Optional.of(clientInfo.getStreamNameRegex()); + } + + Map<String, String> ownershipMap = streamManager.getStreamOwnershipMap(regex); + serverInfo.setOwnerships(ownershipMap); + return Future.value(serverInfo); + } + + @VisibleForTesting + Stream getLogWriter(String stream) throws IOException { + Stream writer = streamManager.getStream(stream); + if (null == writer) { + closeLock.readLock().lock(); + try { + if (featureRegionStopAcceptNewStream.isAvailable()) { + // accept new stream is disabled in current dc + throw new RegionUnavailableException("Region is unavailable right now."); + } else if (!(ServerStatus.WRITE_AND_ACCEPT == serverStatus)) { + // if it is closed, we would not acquire stream again. + return null; + } + writer = streamManager.getOrCreateStream(stream, true); + } finally { + closeLock.readLock().unlock(); + } + } + return writer; + } + + // Service interface methods + + @Override + public Future<WriteResponse> write(final String stream, ByteBuffer data) { + receivedRecordCounter.inc(); + return doWrite(stream, data, null /* checksum */, false); + } + + @Override + public Future<BulkWriteResponse> writeBulkWithContext(final String stream, + List<ByteBuffer> data, + WriteContext ctx) { + bulkWritePendingStat.inc(); + receivedRecordCounter.add(data.size()); + BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter, + getChecksum(ctx), featureChecksumDisabled, accessControlManager); + executeStreamOp(op); + return op.result().ensure(new Function0<BoxedUnit>() { + public BoxedUnit apply() { + bulkWritePendingStat.dec(); + return null; + } + }); + } + + @Override + public Future<WriteResponse> writeWithContext(final String stream, ByteBuffer data, WriteContext ctx) { + return doWrite(stream, data, getChecksum(ctx), ctx.isIsRecordSet()); + } + + @Override + public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) { + HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx), + featureChecksumDisabled, accessControlManager); + executeStreamOp(op); + return op.result(); + } + + @Override + public Future<WriteResponse> heartbeatWithOptions(String stream, WriteContext ctx, HeartbeatOptions options) { + HeartbeatOp op = new HeartbeatOp(stream, statsLogger, perStreamStatsLogger, dlsnVersion, getChecksum(ctx), + featureChecksumDisabled, accessControlManager); + if (options.isSendHeartBeatToReader()) { + op.setWriteControlRecord(true); + } + executeStreamOp(op); + return op.result(); + } + + @Override + public Future<WriteResponse> truncate(String stream, String dlsn, WriteContext ctx) { + TruncateOp op = new TruncateOp( + stream, + DLSN.deserialize(dlsn), + statsLogger, + perStreamStatsLogger, + getChecksum(ctx), + featureChecksumDisabled, + accessControlManager); + executeStreamOp(op); + return op.result(); + } + + @Override + public Future<WriteResponse> delete(String stream, WriteContext ctx) { + DeleteOp op = new DeleteOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx), + featureChecksumDisabled, accessControlManager); + executeStreamOp(op); + return op.result(); + } + + @Override + public Future<WriteResponse> release(String stream, WriteContext ctx) { + ReleaseOp op = new ReleaseOp(stream, statsLogger, perStreamStatsLogger, streamManager, getChecksum(ctx), + featureChecksumDisabled, accessControlManager); + executeStreamOp(op); + return op.result(); + } + + @Override + public Future<WriteResponse> create(String stream, WriteContext ctx) { + CreateOp op = new CreateOp(stream, statsLogger, streamManager, getChecksum(ctx), featureChecksumDisabled); + return executeStreamAdminOp(op); + } + + // + // Ownership RPC + // + + @Override + public Future<WriteResponse> getOwner(String streamName, WriteContext ctx) { + if (streamManager.isAcquired(streamName)) { + // the stream is already acquired + return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId))); + } + + return placementPolicy.placeStream(streamName).map(new Function<String, WriteResponse>() { + @Override + public WriteResponse apply(String server) { + String host = DLSocketAddress.toLockId(InetSocketAddressHelper.parse(server), -1); + return new WriteResponse(ResponseUtils.ownerToHeader(host)); + } + }); + } + + + // + // Admin RPCs + // + + @Override + public Future<Void> setAcceptNewStream(boolean enabled) { + closeLock.writeLock().lock(); + try { + logger.info("Set AcceptNewStream = {}", enabled); + if (ServerStatus.DOWN != serverStatus) { + if (enabled) { + serverStatus = ServerStatus.WRITE_AND_ACCEPT; + } else { + serverStatus = ServerStatus.WRITE_ONLY; + } + } + } finally { + closeLock.writeLock().unlock(); + } + return Future.Void(); + } + + private Future<WriteResponse> doWrite(final String name, + ByteBuffer data, + Long checksum, + boolean isRecordSet) { + writePendingStat.inc(); + receivedRecordCounter.inc(); + WriteOp op = newWriteOp(name, data, checksum, isRecordSet); + executeStreamOp(op); + return op.result().ensure(new Function0<BoxedUnit>() { + public BoxedUnit apply() { + writePendingStat.dec(); + return null; + } + }); + } + + private Long getChecksum(WriteContext ctx) { + return ctx.isSetCrc32() ? ctx.getCrc32() : null; + } + + private Future<WriteResponse> executeStreamAdminOp(final StreamAdminOp op) { + try { + op.preExecute(); + } catch (DLException dle) { + return Future.exception(dle); + } + return op.execute(); + } + + private void executeStreamOp(final StreamOp op) { + + // Must attach this as early as possible--returning before this point will cause us to + // lose the status code. + op.responseHeader().addEventListener(new FutureEventListener<ResponseHeader>() { + @Override + public void onSuccess(ResponseHeader header) { + if (header.getLocation() != null || header.getCode() == StatusCode.FOUND) { + redirects.inc(); + } + countStatusCode(header.getCode()); + } + @Override + public void onFailure(Throwable cause) { + } + }); + + try { + // Apply the request limiter + limiter.apply(op); + + // Execute per-op pre-exec code + op.preExecute(); + + } catch (TooManyStreamsException e) { + // Translate to StreamUnavailableException to ensure that the client will redirect + // to a different host. Ideally we would be able to return TooManyStreamsException, + // but the way exception handling works right now we can't control the handling in + // the client because client changes deploy very slowly. + op.fail(new StreamUnavailableException(e.getMessage())); + return; + } catch (Exception e) { + op.fail(e); + return; + } + + Stream stream; + try { + stream = getLogWriter(op.streamName()); + } catch (RegionUnavailableException rue) { + // redirect the requests to other region + op.fail(new RegionUnavailableException("Region " + serverRegionId + " is unavailable.")); + return; + } catch (IOException e) { + op.fail(e); + return; + } + if (null == stream) { + // redirect the requests when stream is unavailable. + op.fail(new ServiceUnavailableException("Server " + clientId + " is closed.")); + return; + } + + if (op instanceof WriteOpWithPayload) { + WriteOpWithPayload writeOp = (WriteOpWithPayload) op; + windowedBps.add(writeOp.getPayloadSize()); + windowedRps.inc(); + } + + stream.submit(op); + } + + void shutdown() { + try { + closeLock.writeLock().lock(); + try { + if (ServerStatus.DOWN == serverStatus) { + return; + } + serverStatus = ServerStatus.DOWN; + } finally { + closeLock.writeLock().unlock(); + } + + streamManager.close(); + movingAvgFactory.close(); + limiter.close(); + + Stopwatch closeStreamsStopwatch = Stopwatch.createStarted(); + + Future<List<Void>> closeResult = streamManager.closeStreams(); + logger.info("Waiting for closing all streams ..."); + try { + Await.result(closeResult, Duration.fromTimeUnit(5, TimeUnit.MINUTES)); + logger.info("Closed all streams in {} millis.", + closeStreamsStopwatch.elapsed(TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + logger.warn("Interrupted on waiting for closing all streams : ", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + logger.warn("Sorry, we didn't close all streams gracefully in 5 minutes : ", e); + } + + // shutdown the dl namespace + logger.info("Closing distributedlog namespace ..."); + dlNamespace.close(); + logger.info("Closed distributedlog namespace ."); + + // Stop the feature provider + if (this.featureProvider instanceof AbstractFeatureProvider) { + ((AbstractFeatureProvider) featureProvider).stop(); + } + + // Stop the timer. + timer.stop(); + placementPolicy.close(); + + // clean up gauge + unregisterGauge(); + + // shutdown the executor after requesting closing streams. + SchedulerUtils.shutdownScheduler(scheduler, 60, TimeUnit.SECONDS); + } catch (Exception ex) { + logger.info("Exception while shutting down distributedlog service."); + } finally { + // release the keepAliveLatch in case shutdown is called from a shutdown hook. + keepAliveLatch.countDown(); + logger.info("Finished shutting down distributedlog service."); + } + } + + protected void startPlacementPolicy() { + this.placementPolicy.start(shard == 0); + } + + @Override + public void notifyFatalError() { + triggerShutdown(); + } + + private void triggerShutdown() { + // release the keepAliveLatch to let the main thread shutdown the whole service. + logger.info("Releasing KeepAlive Latch to trigger shutdown ..."); + keepAliveLatch.countDown(); + logger.info("Released KeepAlive Latch. Main thread will shut the service down."); + } + + // Test methods. + + private DynamicDistributedLogConfiguration getDynConf(String streamName) { + Optional<DynamicDistributedLogConfiguration> dynDlConf = + streamConfigProvider.getDynamicStreamConfig(streamName); + if (dynDlConf.isPresent()) { + return dynDlConf.get(); + } else { + return ConfUtils.getConstDynConf(dlConfig); + } + } + + /** + * clean up the gauge before we close to help GC. + */ + private void unregisterGauge(){ + this.statsLogger.unregisterGauge("proxy_status", this.proxyStatusGauge); + this.statsLogger.unregisterGauge("moving_avg_rps", this.movingAvgRpsGauge); + this.statsLogger.unregisterGauge("moving_avg_bps", this.movingAvgBpsGauge); + this.statsLogger.unregisterGauge("acquired", this.streamAcquiredGauge); + this.statsLogger.unregisterGauge("cached", this.streamCachedGauge); + } + + @VisibleForTesting + Stream newStream(String name) throws IOException { + return streamManager.getOrCreateStream(name, false); + } + + @VisibleForTesting + WriteOp newWriteOp(String stream, ByteBuffer data, Long checksum) { + return newWriteOp(stream, data, checksum, false); + } + + @VisibleForTesting + RoutingService getRoutingService() { + return this.routingService; + } + + @VisibleForTesting + DLSocketAddress getServiceAddress() throws IOException { + return DLSocketAddress.deserialize(clientId); + } + + WriteOp newWriteOp(String stream, + ByteBuffer data, + Long checksum, + boolean isRecordSet) { + return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter, + serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled, + accessControlManager); + } + + @VisibleForTesting + Future<List<Void>> closeStreams() { + return streamManager.closeStreams(); + } + + @VisibleForTesting + public DistributedLogNamespace getDistributedLogNamespace() { + return dlNamespace; + } + + @VisibleForTesting + StreamManager getStreamManager() { + return streamManager; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java new file mode 100644 index 0000000..17b5ab3 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/FatalErrorHandler.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +/** + * Implement handling for an unrecoverable error. + */ +public interface FatalErrorHandler { + + /** + * This method is invoked when an unrecoverable error has occurred + * and no progress can be made. It should implement a shutdown routine. + */ + void notifyFatalError(); +}