http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java index 6b79fb4..45c353c 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java @@ -17,6 +17,26 @@ */ package com.alibaba.jstorm.message.netty; +import backtype.storm.Config; +import backtype.storm.messaging.IConnection; +import backtype.storm.messaging.TaskMessage; +import backtype.storm.utils.DisruptorQueue; +import backtype.storm.utils.Utils; +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.common.metric.*; +import com.alibaba.jstorm.metric.*; +import com.alibaba.jstorm.utils.JStormServerUtils; +import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.NetWorkUtils; +import com.codahale.metrics.health.HealthCheck; +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.HashSet; @@ -29,35 +49,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; -import backtype.storm.messaging.IConnection; -import backtype.storm.messaging.TaskMessage; -import backtype.storm.utils.DisruptorQueue; -import backtype.storm.utils.Utils; - -import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.common.metric.Histogram; -import com.alibaba.jstorm.common.metric.Meter; -import com.alibaba.jstorm.common.metric.QueueGauge; -import com.alibaba.jstorm.metric.JStormHealthCheck; -import com.alibaba.jstorm.metric.JStormMetrics; -import com.alibaba.jstorm.metric.MetricDef; -import com.alibaba.jstorm.utils.JStormServerUtils; -import com.alibaba.jstorm.utils.JStormUtils; -import com.alibaba.jstorm.utils.NetWorkUtils; -import com.codahale.metrics.health.HealthCheck; - class NettyClient implements IConnection { - private static final Logger LOG = LoggerFactory - .getLogger(NettyClient.class); + private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class); + protected String name; protected final int max_retries; @@ -84,9 +78,11 @@ class NettyClient implements IConnection { protected String address; // doesn't use timer, due to competition - protected Histogram sendTimer; - protected Histogram batchSizeHistogram; - protected Meter sendSpeed; + protected AsmHistogram sendTimer; + protected AsmHistogram batchSizeHistogram; + protected AsmMeter sendSpeed; + protected static AsmMeter totalSendSpeed = (AsmMeter) JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName( + MetricDef.NETTY_CLI_SEND_SPEED, MetricType.METER), new AsmMeter()); protected ReconnectRunnable reconnector; protected ChannelFactory clientChannelFactory; @@ -94,19 +90,19 @@ class NettyClient implements IConnection { protected Set<Channel> closingChannel; protected AtomicBoolean isConnecting = new AtomicBoolean(false); - + protected NettyConnection nettyConnection; - + protected Map stormConf; - + protected boolean connectMyself; protected Object channelClosing = new Object(); + protected boolean enableNettyMetrics; + @SuppressWarnings("rawtypes") - NettyClient(Map storm_conf, ChannelFactory factory, - ScheduledExecutorService scheduler, String host, int port, - ReconnectRunnable reconnector) { + NettyClient(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) { this.stormConf = storm_conf; this.factory = factory; this.scheduler = scheduler; @@ -116,34 +112,21 @@ class NettyClient implements IConnection { channelRef = new AtomicReference<Channel>(null); being_closed = new AtomicBoolean(false); pendings = new AtomicLong(0); - + nettyConnection = new NettyConnection(); - nettyConnection.setClientPort(NetWorkUtils.ip(), - ConfigExtension.getLocalWorkerPort(storm_conf)); + nettyConnection.setClientPort(NetWorkUtils.ip(), ConfigExtension.getLocalWorkerPort(storm_conf)); nettyConnection.setServerPort(host, port); // Configure - buffer_size = - Utils.getInt(storm_conf - .get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); - max_retries = - Math.min(30, Utils.getInt(storm_conf - .get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES))); - base_sleep_ms = - Utils.getInt(storm_conf - .get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)); - max_sleep_ms = - Utils.getInt(storm_conf - .get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)); + buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); + max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES))); + base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)); + max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)); timeoutMs = ConfigExtension.getNettyPendingBufferTimeout(storm_conf); - MAX_SEND_PENDING = - (int) ConfigExtension.getNettyMaxSendPending(storm_conf); + MAX_SEND_PENDING = (int) ConfigExtension.getNettyMaxSendPending(storm_conf); - this.messageBatchSize = - Utils.getInt( - storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), - 262144); + this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144); messageBatchRef = new AtomicReference<MessageBatch>(); // Start the connection attempt. @@ -152,56 +135,62 @@ class NettyClient implements IConnection { connectMyself = isConnectMyself(stormConf, host, port); address = JStormServerUtils.getName(host, port); - - if (connectMyself == false) { + + this.enableNettyMetrics = MetricUtils.isEnableNettyMetrics(storm_conf); + LOG.info("** enable netty metrics: {}", this.enableNettyMetrics); + if (!connectMyself) { registerMetrics(); } closingChannel = new HashSet<Channel>(); } - + public void registerMetrics() { - sendTimer = - JStormMetrics.registerWorkerHistogram( - MetricDef.NETTY_CLI_SEND_TIME, nettyConnection.toString()); - batchSizeHistogram = - JStormMetrics.registerWorkerHistogram( - MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection.toString()); - sendSpeed = JStormMetrics.registerWorkerMeter(MetricDef.NETTY_CLI_SEND_SPEED, - nettyConnection.toString()); - - CacheGaugeHealthCheck cacheGauge = - new CacheGaugeHealthCheck(messageBatchRef, - MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString()); - JStormMetrics.registerWorkerGauge(cacheGauge, - MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection.toString()); - JStormHealthCheck.registerWorkerHealthCheck( - MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString(), cacheGauge); - - JStormMetrics.registerWorkerGauge( - new com.codahale.metrics.Gauge<Double>() { + if (this.enableNettyMetrics) { + sendTimer = (AsmHistogram) JStormMetrics.registerNettyMetric( + MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_TIME, nettyConnection), + MetricType.HISTOGRAM), + new AsmHistogram()); + batchSizeHistogram = (AsmHistogram) JStormMetrics.registerNettyMetric( + MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection), + MetricType.HISTOGRAM), + new AsmHistogram()); + sendSpeed = (AsmMeter) JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName( + AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_SPEED, nettyConnection), MetricType.METER), new AsmMeter()); + + CacheGaugeHealthCheck cacheGauge = new CacheGaugeHealthCheck(messageBatchRef, + MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString()); + JStormMetrics.registerNettyMetric(MetricUtils + .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection), MetricType.GAUGE), + new AsmGauge(cacheGauge)); + + JStormMetrics.registerNettyMetric(MetricUtils + .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection), MetricType.GAUGE), + new AsmGauge(new com.codahale.metrics.Gauge<Double>() { + @Override + public Double getValue() { + return ((Long) pendings.get()).doubleValue(); + } + })); - @Override - public Double getValue() { - return ((Long) pendings.get()).doubleValue(); - } - }, MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection.toString()); - - JStormHealthCheck.registerWorkerHealthCheck( - MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString(), + JStormHealthCheck.registerWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString(), + cacheGauge); + } + + JStormHealthCheck.registerWorkerHealthCheck(MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString(), new HealthCheck() { - HealthCheck.Result healthy = HealthCheck.Result.healthy(); - HealthCheck.Result unhealthy = HealthCheck.Result.unhealthy - ("NettyConnection " + nettyConnection.toString() + " is broken."); + Result healthy = Result.healthy(); + Result unhealthy = Result + .unhealthy("NettyConnection " + nettyConnection.toString() + " is broken."); + @Override protected Result check() throws Exception { - // TODO Auto-generated method stub if (isChannelReady() == null) { return unhealthy; - }else { + } else { return healthy; } } - + }); } @@ -216,23 +205,28 @@ class NettyClient implements IConnection { bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, stormConf)); reconnect(); } - + public boolean isConnectMyself(Map conf, String host, int port) { String localIp = NetWorkUtils.ip(); String remoteIp = NetWorkUtils.host2Ip(host); int localPort = ConfigExtension.getLocalWorkerPort(conf); - - if (localPort == port && - localIp.equals(remoteIp)) { + + if (localPort == port && localIp.equals(remoteIp)) { return true; } - + return false; } + public void notifyInterestChanged(Channel channel) { + if (channel.isWritable()) { + MessageBatch messageBatch = messageBatchRef.getAndSet(null); + flushRequest(channel, messageBatch); + } + } + /** * The function can't be synchronized, otherwise it will be deadlock - * */ public void doReconnect() { if (channelRef.get() != null) { @@ -255,12 +249,10 @@ class NettyClient implements IConnection { } long sleepMs = getSleepTimeMs(); - LOG.info("Reconnect ... [{}], {}, sleep {}ms", retries.get(), name, - sleepMs); + LOG.info("Reconnect ... [{}], {}, sleep {}ms", retries.get(), name, sleepMs); ChannelFuture future = bootstrap.connect(remote_addr); future.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) - throws Exception { + public void operationComplete(ChannelFuture future) throws Exception { isConnecting.set(false); Channel channel = future.getChannel(); if (future.isSuccess()) { @@ -269,17 +261,12 @@ class NettyClient implements IConnection { setChannel(channel); // handleResponse(); } else { - LOG.info( - "Failed to reconnect ... [{}], {}, channel = {}, cause = {}", - retries.get(), name, channel, future.getCause()); + LOG.info("Failed to reconnect ... [{}], {}, channel = {}, cause = {}", retries.get(), name, channel, future.getCause()); reconnect(); } } }); JStormUtils.sleepMs(sleepMs); - - return; - } public void reconnect() { @@ -290,7 +277,6 @@ class NettyClient implements IConnection { * # of milliseconds to wait per exponential back-off policy */ private int getSleepTimeMs() { - int sleepMs = base_sleep_ms * retries.incrementAndGet(); if (sleepMs > 1000) { sleepMs = 1000; @@ -310,7 +296,7 @@ class NettyClient implements IConnection { public void send(TaskMessage message) { LOG.warn("Should be overload"); } - + Channel isChannelReady() { Channel channel = channelRef.get(); if (channel == null) { @@ -325,26 +311,28 @@ class NettyClient implements IConnection { return channel; } - protected synchronized void flushRequest(Channel channel, - final MessageBatch requests) { + protected synchronized void flushRequest(Channel channel, final MessageBatch requests) { if (requests == null || requests.isEmpty()) return; - Double batchSize = Double.valueOf(requests.getEncoded_length()); - batchSizeHistogram.update(batchSize); + Long batchSize = (long) requests.getEncoded_length(); + if (batchSizeHistogram != null) { + batchSizeHistogram.update(batchSize); + } pendings.incrementAndGet(); - sendSpeed.update(batchSize); + if (sendSpeed != null) { + sendSpeed.update(batchSize); + } + totalSendSpeed.update(batchSize); ChannelFuture future = channel.write(requests); future.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) - throws Exception { + public void operationComplete(ChannelFuture future) throws Exception { pendings.decrementAndGet(); if (!future.isSuccess()) { Channel channel = future.getChannel(); if (isClosed() == false) { - LOG.info("Failed to send requests to " + name + ": " - + channel.toString() + ":", future.getCause()); + LOG.info("Failed to send requests to " + name + ": " + channel.toString() + ":", future.getCause()); } if (null != channel) { @@ -357,32 +345,29 @@ class NettyClient implements IConnection { } }); } - + public void unregisterMetrics() { - JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_SEND_TIME, - nettyConnection.toString()); - JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_BATCH_SIZE, - nettyConnection.toString()); - JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_SEND_PENDING, - nettyConnection.toString()); - JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_CACHE_SIZE, - nettyConnection.toString()); - JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_SEND_SPEED, - nettyConnection.toString()); - - JStormHealthCheck - .unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE - + ":" + nettyConnection.toString()); - - JStormHealthCheck.unregisterWorkerHealthCheck( - MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString()); + if (this.enableNettyMetrics) { + JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName( + AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_TIME, nettyConnection), MetricType.HISTOGRAM)); + JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName( + AsmMetric.mkName(MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection), MetricType.HISTOGRAM)); + JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName( + AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection), MetricType.GAUGE)); + JStormMetrics.unregisterNettyMetric(MetricUtils + .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection), MetricType.GAUGE)); + JStormMetrics.unregisterNettyMetric(MetricUtils + .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_SPEED, nettyConnection), MetricType.METER)); + } + JStormHealthCheck.unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString()); + + JStormHealthCheck.unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString()); } /** * gracefully close this client. - * - * We will send all existing requests, and then invoke close_n_release() - * method + * <p/> + * We will send all existing requests, and then invoke close_n_release() method */ public void close() { LOG.info("Close netty connection to {}", name()); @@ -391,7 +376,7 @@ class NettyClient implements IConnection { return; } - if (connectMyself == false) { + if (!connectMyself) { unregisterMetrics(); } @@ -410,17 +395,13 @@ class NettyClient implements IConnection { final long timeoutMilliSeconds = 10 * 1000; final long start = System.currentTimeMillis(); - LOG.info("Waiting for pending batchs to be sent with " + name() - + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, - pendings.get()); + LOG.info("Waiting for pending batchs to be sent with " + name() + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, pendings.get()); while (pendings.get() != 0) { try { long delta = System.currentTimeMillis() - start; if (delta > timeoutMilliSeconds) { - LOG.error( - "Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", - name(), pendings.get()); + LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), pendings.get()); break; } Thread.sleep(1000); // sleep 1s @@ -445,7 +426,7 @@ class NettyClient implements IConnection { /** * Avoid channel double close - * + * * @param channel */ void closeChannel(final Channel channel) { @@ -461,8 +442,7 @@ class NettyClient implements IConnection { LOG.debug(channel.toString() + " begin to closed"); ChannelFuture closeFuture = channel.close(); closeFuture.addListener(new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) - throws Exception { + public void operationComplete(ChannelFuture future) throws Exception { synchronized (channelClosing) { closingChannel.remove(channel); @@ -501,14 +481,9 @@ class NettyClient implements IConnection { retries.set(0); } - final String oldLocalAddres = - (oldChannel == null) ? "null" : oldChannel.getLocalAddress() - .toString(); - String newLocalAddress = - (newChannel == null) ? "null" : newChannel.getLocalAddress() - .toString(); - LOG.info("Use new channel {} replace old channel {}", newLocalAddress, - oldLocalAddres); + final String oldLocalAddres = (oldChannel == null) ? "null" : oldChannel.getLocalAddress().toString(); + String newLocalAddress = (newChannel == null) ? "null" : newChannel.getLocalAddress().toString(); + LOG.info("Use new channel {} replace old channel {}", newLocalAddress, oldLocalAddres); // avoid one netty client use too much connection, close old one if (oldChannel != newChannel && oldChannel != null) { @@ -555,60 +530,56 @@ class NettyClient implements IConnection { @Override public Object recv(Integer taskId, int flags) { - throw new UnsupportedOperationException( - "recvTask: Client connection should not receive any messages"); + throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages"); } @Override public void registerQueue(Integer taskId, DisruptorQueue recvQueu) { - throw new UnsupportedOperationException( - "recvTask: Client connection should not receive any messages"); + throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages"); } @Override public void enqueue(TaskMessage message) { - throw new UnsupportedOperationException( - "recvTask: Client connection should not receive any messages"); + throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages"); } - public static class CacheGaugeHealthCheck extends HealthCheck implements - com.codahale.metrics.Gauge<Double> { + public static class CacheGaugeHealthCheck extends HealthCheck implements com.codahale.metrics.Gauge<Double> { AtomicReference<MessageBatch> messageBatchRef; String name; Result healthy; - public CacheGaugeHealthCheck( - AtomicReference<MessageBatch> messageBatchRef, String name) { + public CacheGaugeHealthCheck(AtomicReference<MessageBatch> messageBatchRef, String name) { this.messageBatchRef = messageBatchRef; this.name = name; - this.healthy = HealthCheck.Result.healthy(); + this.healthy = Result.healthy(); } @Override public Double getValue() { - // TODO Auto-generated method stub MessageBatch messageBatch = messageBatchRef.get(); if (messageBatch == null) { return 0.0; } else { - Double ret = (double) messageBatch.getEncoded_length(); - return ret; + return (double) messageBatch.getEncoded_length(); } } @Override protected Result check() throws Exception { - // TODO Auto-generated method stub Double size = getValue(); if (size > 8 * JStormUtils.SIZE_1_M) { - return HealthCheck.Result.unhealthy(name - + QueueGauge.QUEUE_IS_FULL); + return Result.unhealthy(name + QueueGauge.QUEUE_IS_FULL); } else { return healthy; } } } + + @Override + public boolean available() { + return (isChannelReady() != null); + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java index 1d582ba..9b58063 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java @@ -17,31 +17,29 @@ */ package com.alibaba.jstorm.message.netty; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.lang.builder.ToStringBuilder; -import org.apache.commons.lang.builder.ToStringStyle; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.messaging.TaskMessage; import backtype.storm.utils.Utils; - import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.utils.IntervalCheck; import com.alibaba.jstorm.utils.JStormServerUtils; import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeUtils; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; class NettyClientAsync extends NettyClient { - private static final Logger LOG = LoggerFactory - .getLogger(NettyClientAsync.class); + private static final Logger LOG = LoggerFactory.getLogger(NettyClientAsync.class); public static final String PREFIX = "Netty-Client-"; // when batch buffer size is more than BATCH_THREASHOLD_WARN @@ -54,7 +52,6 @@ class NettyClientAsync extends NettyClient { protected final boolean blockSend; boolean isDirectSend(Map conf) { - if (JStormServerUtils.isOnePending(conf) == true) { return true; } @@ -71,22 +68,15 @@ class NettyClientAsync extends NettyClient { } @SuppressWarnings("rawtypes") - NettyClientAsync(Map storm_conf, ChannelFactory factory, - ScheduledExecutorService scheduler, String host, int port, - ReconnectRunnable reconnector) { + NettyClientAsync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) { super(storm_conf, factory, scheduler, host, port, reconnector); - BATCH_THREASHOLD_WARN = - ConfigExtension.getNettyBufferThresholdSize(storm_conf); - + BATCH_THREASHOLD_WARN = ConfigExtension.getNettyBufferThresholdSize(storm_conf); blockSend = isBlockSend(storm_conf); - directlySend = isDirectSend(storm_conf); flush_later = new AtomicBoolean(false); - flushCheckInterval = - Utils.getInt(storm_conf - .get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); + flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); Runnable flusher = new Runnable() { @Override @@ -95,13 +85,11 @@ class NettyClientAsync extends NettyClient { } }; long initialDelay = Math.min(1000, max_sleep_ms * max_retries); - scheduler.scheduleAtFixedRate(flusher, initialDelay, - flushCheckInterval, TimeUnit.MILLISECONDS); + scheduler.scheduleAtFixedRate(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS); clientChannelFactory = factory; start(); - LOG.info(this.toString()); } @@ -109,7 +97,7 @@ class NettyClientAsync extends NettyClient { * Enqueue a task message to be sent to server */ @Override - synchronized public void send(List<TaskMessage> messages) { + public synchronized void send(List<TaskMessage> messages) { // throw exception if the client is being closed if (isClosed()) { LOG.warn("Client is being closed, and does not take requests any more"); @@ -123,13 +111,14 @@ class NettyClientAsync extends NettyClient { throw new RuntimeException(e); } finally { long end = System.nanoTime(); - sendTimer.update((end - start)/1000000.0d); - + if (sendTimer != null) { + sendTimer.update((end - start) / TimeUtils.NS_PER_US); + } } } @Override - synchronized public void send(TaskMessage message) { + public synchronized void send(TaskMessage message) { // throw exception if the client is being closed if (isClosed()) { LOG.warn("Client is being closed, and does not take requests any more"); @@ -143,7 +132,9 @@ class NettyClientAsync extends NettyClient { throw new RuntimeException(e); } finally { long end = System.nanoTime(); - sendTimer.update((end - start)/1000000.0d); + if (sendTimer != null) { + sendTimer.update((end - start) / TimeUtils.NS_PER_US); + } } } @@ -159,21 +150,17 @@ class NettyClientAsync extends NettyClient { long now = System.currentTimeMillis(); long delt = now - begin; if (oneSecond.check() == true) { - LOG.warn( - "Target server {} is unavailable, pending {}, bufferSize {}, block sending {}ms", - name, pendings.get(), cachedSize, delt); + LOG.warn("Target server {} is unavailable, pending {}, bufferSize {}, block sending {}ms", name, pendings.get(), cachedSize, delt); } if (timeoutIntervalCheck.check() == true) { if (messageBatchRef.get() != null) { - LOG.warn( - "Target server {} is unavailable, wait too much time, throw timeout message", - name); + LOG.warn("Target server {} is unavailable, wait too much time, throw timeout message", name); messageBatchRef.set(null); } setChannel(null); LOG.warn("Reset channel as null"); - + if (blockSend == false) { reconnect(); break; @@ -184,12 +171,10 @@ class NettyClientAsync extends NettyClient { JStormUtils.sleepMs(sleepMs); if (delt > 2 * timeoutMs * 1000L && changeThreadhold == false) { - if (channelRef.get() != null - && BATCH_THREASHOLD_WARN >= 2 * messageBatchSize) { + if (channelRef.get() != null && BATCH_THREASHOLD_WARN >= 2 * messageBatchSize) { // it is just channel isn't writable; BATCH_THREASHOLD_WARN = BATCH_THREASHOLD_WARN / 2; - LOG.info("Reduce BATCH_THREASHOLD_WARN to {}", - BATCH_THREASHOLD_WARN); + LOG.info("Reduce BATCH_THREASHOLD_WARN to {}", BATCH_THREASHOLD_WARN); changeThreadhold = true; } @@ -296,12 +281,9 @@ class NettyClientAsync extends NettyClient { } else { if (messageBatchRef.compareAndSet(null, messageBatch)) { flush_later.set(true); - } - else + } else LOG.error("MessageBatch will be lost. This should not happen."); } - - return; } void flush() { @@ -339,12 +321,10 @@ class NettyClientAsync extends NettyClient { @Override public void handleResponse() { // do nothing - return; } @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java index c239dd1..2f08957 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java @@ -17,43 +17,35 @@ */ package com.alibaba.jstorm.message.netty; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.lang.builder.ToStringBuilder; -import org.apache.commons.lang.builder.ToStringStyle; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.Config; import backtype.storm.messaging.TaskMessage; import backtype.storm.utils.DisruptorQueue; import backtype.storm.utils.Utils; - +import com.alibaba.jstorm.common.metric.AsmGauge; import com.alibaba.jstorm.common.metric.QueueGauge; -import com.alibaba.jstorm.metric.JStormHealthCheck; -import com.alibaba.jstorm.metric.JStormMetrics; -import com.alibaba.jstorm.metric.MetricDef; +import com.alibaba.jstorm.metric.*; import com.alibaba.jstorm.utils.JStormServerUtils; import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeUtils; import com.codahale.metrics.Gauge; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; class NettyClientSync extends NettyClient implements EventHandler { - private static final Logger LOG = LoggerFactory - .getLogger(NettyClientSync.class); + private static final Logger LOG = LoggerFactory.getLogger(NettyClientSync.class); private ConcurrentLinkedQueue<MessageBatch> batchQueue; private DisruptorQueue disruptorQueue; @@ -63,20 +55,14 @@ class NettyClientSync extends NettyClient implements EventHandler { private AtomicLong emitTs = new AtomicLong(0); @SuppressWarnings("rawtypes") - NettyClientSync(Map storm_conf, ChannelFactory factory, - ScheduledExecutorService scheduler, String host, int port, - ReconnectRunnable reconnector) { + NettyClientSync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) { super(storm_conf, factory, scheduler, host, port, reconnector); batchQueue = new ConcurrentLinkedQueue<MessageBatch>(); - WaitStrategy waitStrategy = - (WaitStrategy) JStormUtils - .createDisruptorWaitStrategy(storm_conf); + WaitStrategy waitStrategy = (WaitStrategy) Utils.newInstance((String) storm_conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY)); - disruptorQueue = - DisruptorQueue.mkInstance(name, ProducerType.MULTI, - MAX_SEND_PENDING * 8, waitStrategy); + disruptorQueue = DisruptorQueue.mkInstance(name, ProducerType.MULTI, MAX_SEND_PENDING * 8, waitStrategy); disruptorQueue.consumerStarted(); if (connectMyself == false) { @@ -93,21 +79,14 @@ class NettyClientSync extends NettyClient implements EventHandler { scheduler.scheduleAtFixedRate(trigger, 10, 1, TimeUnit.SECONDS); /** - * In sync mode, it can't directly use common factory, it will occur - * problem when client close and restart + * In sync mode, it can't directly use common factory, it will occur problem when client close and restart */ - ThreadFactory bossFactory = - new NettyRenameThreadFactory(MetricDef.NETTY_CLI - + JStormServerUtils.getName(host, port) + "-boss"); + ThreadFactory bossFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + JStormServerUtils.getName(host, port) + "-boss"); bossExecutor = Executors.newCachedThreadPool(bossFactory); - ThreadFactory workerFactory = - new NettyRenameThreadFactory(MetricDef.NETTY_CLI - + JStormServerUtils.getName(host, port) + "-worker"); + ThreadFactory workerFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + JStormServerUtils.getName(host, port) + "-worker"); workerExecutor = Executors.newCachedThreadPool(workerFactory); - clientChannelFactory = - new NioClientSocketChannelFactory(bossExecutor, workerExecutor, - 1); + clientChannelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor, 1); start(); @@ -115,24 +94,24 @@ class NettyClientSync extends NettyClient implements EventHandler { } public void registerSyncMetrics() { - JStormMetrics.registerWorkerGauge(new Gauge<Double>() { - @Override - public Double getValue() { - return Double.valueOf(batchQueue.size()); - } - }, MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE, nettyConnection.toString()); - - QueueGauge cacheQueueGauge = - new QueueGauge(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE - + nettyConnection.toString(), disruptorQueue); - - JStormMetrics - .registerWorkerGauge(cacheQueueGauge, - MetricDef.NETTY_CLI_SYNC_DISR_QUEUE, - nettyConnection.toString()); - JStormHealthCheck.registerWorkerHealthCheck( - MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + ":" - + nettyConnection.toString(), cacheQueueGauge); + if (enableNettyMetrics) { + JStormMetrics.registerNettyMetric(MetricUtils + .nettyMetricName(MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE + nettyConnection.toString(), MetricType.GAUGE), + new AsmGauge(new Gauge<Double>() { + @Override + public Double getValue() { + return (double) batchQueue.size(); + } + })); + + QueueGauge cacheQueueGauge = new QueueGauge(disruptorQueue, MetricDef.NETTY_CLI_SYNC_DISR_QUEUE, nettyConnection.toString()); + + JStormMetrics.registerNettyMetric(MetricUtils + .nettyMetricName(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + nettyConnection.toString(), MetricType.GAUGE), + new AsmGauge(cacheQueueGauge)); + JStormHealthCheck.registerWorkerHealthCheck( + MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + ":" + nettyConnection.toString(), cacheQueueGauge); + } } /** @@ -166,8 +145,6 @@ class NettyClientSync extends NettyClient implements EventHandler { /** * Don't take care of competition - * - * @param blocked */ public void sendData() { long start = System.nanoTime(); @@ -188,12 +165,13 @@ class NettyClientSync extends NettyClient implements EventHandler { JStormUtils.halt_process(-1, err); } finally { long end = System.nanoTime(); - sendTimer.update((end - start) / 1000000.0d); + if (sendTimer != null) { + sendTimer.update((end - start) / TimeUtils.NS_PER_US); + } } } public void sendAllData() { - long start = System.nanoTime(); try { disruptorQueue.consumeBatch(this); @@ -216,7 +194,9 @@ class NettyClientSync extends NettyClient implements EventHandler { JStormUtils.halt_process(-1, err); } finally { long end = System.nanoTime(); - sendTimer.update((end - start) / 1000000.0d); + if (sendTimer != null) { + sendTimer.update((end - start) / TimeUtils.NS_PER_US); + } } } @@ -227,8 +207,7 @@ class NettyClientSync extends NettyClient implements EventHandler { } @Override - public void onEvent(Object event, long sequence, boolean endOfBatch) - throws Exception { + public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception { if (event == null) { return; } @@ -296,22 +275,19 @@ class NettyClientSync extends NettyClient implements EventHandler { } public void unregisterSyncMetrics() { - JStormMetrics.unregisterWorkerMetric( - MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE, - nettyConnection.toString()); - JStormMetrics - .unregisterWorkerMetric(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE, - nettyConnection.toString()); - JStormHealthCheck - .unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE - + ":" + nettyConnection.toString()); + if (enableNettyMetrics) { + JStormMetrics.unregisterNettyMetric(MetricUtils + .nettyMetricName(MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE + nettyConnection.toString(), MetricType.GAUGE)); + JStormMetrics.unregisterNettyMetric(MetricUtils + .nettyMetricName(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + nettyConnection.toString(), MetricType.GAUGE)); + JStormHealthCheck + .unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + ":" + nettyConnection.toString()); + } } @Override public void close() { - LOG.info( - "Begin to close connection to {} and flush all data, batchQueue {}, disruptor {}", - name, batchQueue.size(), disruptorQueue.population()); + LOG.info("Begin to close connection to {} and flush all data, batchQueue {}, disruptor {}", name, batchQueue.size(), disruptorQueue.population()); sendAllData(); disruptorQueue.haltWithInterrupt(); if (connectMyself == false) { @@ -326,7 +302,6 @@ class NettyClientSync extends NettyClient implements EventHandler { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java index cd8c0fa..4f2358f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java @@ -21,23 +21,23 @@ import java.io.Serializable; import com.alibaba.jstorm.utils.NetWorkUtils; -public class NettyConnection implements Serializable{ +public class NettyConnection implements Serializable { protected String clientPort; protected String serverPort; - + public String getClientPort() { return clientPort; } - + public void setClientPort(String client, int port) { String ip = NetWorkUtils.host2Ip(client); clientPort = ip + ":" + port; } - + public String getServerPort() { return serverPort; } - + public void setServerPort(String server, int port) { String ip = NetWorkUtils.host2Ip(server); serverPort = ip + ":" + port; @@ -47,12 +47,8 @@ public class NettyConnection implements Serializable{ public int hashCode() { final int prime = 31; int result = 1; - result = - prime * result - + ((clientPort == null) ? 0 : clientPort.hashCode()); - result = - prime * result - + ((serverPort == null) ? 0 : serverPort.hashCode()); + result = prime * result + ((clientPort == null) ? 0 : clientPort.hashCode()); + result = prime * result + ((serverPort == null) ? 0 : serverPort.hashCode()); return result; } @@ -77,15 +73,14 @@ public class NettyConnection implements Serializable{ return false; return true; } - + @Override public String toString() { - return clientPort + "->" + serverPort; + return clientPort + "->" + serverPort; } - - public static String mkString(String client, int clientPort, - String server, int serverPort) { + + public static String mkString(String client, int clientPort, String server, int serverPort) { return client + ":" + clientPort + "->" + server + ":" + serverPort; } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java index a6ddd9a..1a090f2 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java @@ -40,8 +40,7 @@ import com.alibaba.jstorm.metric.MetricDef; import com.alibaba.jstorm.utils.JStormUtils; public class NettyContext implements IContext { - private final static Logger LOG = LoggerFactory - .getLogger(NettyContext.class); + private final static Logger LOG = LoggerFactory.getLogger(NettyContext.class); @SuppressWarnings("rawtypes") private Map storm_conf; @@ -65,36 +64,19 @@ public class NettyContext implements IContext { public void prepare(Map storm_conf) { this.storm_conf = storm_conf; - int maxWorkers = - Utils.getInt(storm_conf - .get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS)); - ThreadFactory bossFactory = - new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "boss"); - ThreadFactory workerFactory = - new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "worker"); + int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS)); + ThreadFactory bossFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "boss"); + ThreadFactory workerFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "worker"); if (maxWorkers > 0) { clientChannelFactory = - new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory), - maxWorkers); + new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), maxWorkers); } else { - clientChannelFactory = - new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory)); + clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory)); } - int otherWorkers = - Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1; - int poolSize = - Math.min(Math.max(1, otherWorkers), - MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE); - clientScheduleService = - Executors - .newScheduledThreadPool(poolSize, - new NettyRenameThreadFactory( - "client-schedule-service")); + int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1; + int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE); + clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service")); reconnector = new ReconnectRunnable(); new AsyncLoopThread(reconnector, true, Thread.MIN_PRIORITY, true); @@ -119,11 +101,9 @@ public class NettyContext implements IContext { @Override public IConnection connect(String topology_id, String host, int port) { if (isSyncMode == true) { - return new NettyClientSync(storm_conf, clientChannelFactory, - clientScheduleService, host, port, reconnector); + return new NettyClientSync(storm_conf, clientChannelFactory, clientScheduleService, host, port, reconnector); } else { - return new NettyClientAsync(storm_conf, clientChannelFactory, - clientScheduleService, host, port, reconnector); + return new NettyClientAsync(storm_conf, clientChannelFactory, clientScheduleService, host, port, reconnector); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java index 2e060c2..5d38fc5 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java @@ -27,8 +27,7 @@ public class NettyRenameThreadFactory implements ThreadFactory { static { // Rename Netty threads - ThreadRenamingRunnable - .setThreadNameDeterminer(ThreadNameDeterminer.CURRENT); + ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT); } final ThreadGroup group; @@ -37,15 +36,12 @@ public class NettyRenameThreadFactory implements ThreadFactory { NettyRenameThreadFactory(String name) { SecurityManager s = System.getSecurityManager(); - group = - (s != null) ? s.getThreadGroup() : Thread.currentThread() - .getThreadGroup(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); this.name = name; } public Thread newThread(Runnable r) { - Thread t = - new Thread(group, r, name + "-" + index.getAndIncrement(), 0); + Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java index d00b24f..a5fa859 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java @@ -44,21 +44,19 @@ import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.client.ConfigExtension; class NettyServer implements IConnection { - private static final Logger LOG = LoggerFactory - .getLogger(NettyServer.class); + private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class); @SuppressWarnings("rawtypes") Map storm_conf; int port; // private LinkedBlockingQueue message_queue; - volatile ChannelGroup allChannels = - new DefaultChannelGroup("jstorm-server"); + volatile ChannelGroup allChannels = new DefaultChannelGroup("jstorm-server"); final ChannelFactory factory; final ServerBootstrap bootstrap; // ayncBatch is only one solution, so directly set it as true private final boolean isSyncMode; - + private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues; @SuppressWarnings("rawtypes") @@ -69,30 +67,17 @@ class NettyServer implements IConnection { this.deserializeQueues = deserializeQueues; // Configure the server. - int buffer_size = - Utils.getInt(storm_conf - .get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); - int maxWorkers = - Utils.getInt(storm_conf - .get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS)); + int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); + int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS)); // asyncBatch = ConfigExtension.isNettyTransferAsyncBatch(storm_conf); - ThreadFactory bossFactory = - new NettyRenameThreadFactory("server" + "-boss"); - ThreadFactory workerFactory = - new NettyRenameThreadFactory("server" + "-worker"); + ThreadFactory bossFactory = new NettyRenameThreadFactory("server" + "-boss"); + ThreadFactory workerFactory = new NettyRenameThreadFactory("server" + "-worker"); if (maxWorkers > 0) { - factory = - new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory), - maxWorkers); + factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), maxWorkers); } else { - factory = - new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory)); + factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory)); } bootstrap = new ServerBootstrap(factory); @@ -108,8 +93,7 @@ class NettyServer implements IConnection { Channel channel = bootstrap.bind(new InetSocketAddress(port)); allChannels.add(channel); - LOG.info("Successfull bind {}, buffer_size:{}, maxWorkers:{}", port, - buffer_size, maxWorkers); + LOG.info("Successfull bind {}, buffer_size:{}, maxWorkers:{}", port, buffer_size, maxWorkers); } @Override @@ -129,8 +113,7 @@ class NettyServer implements IConnection { DisruptorQueue queue = deserializeQueues.get(task); if (queue == null) { - LOG.debug("Received invalid message directed at port " + task - + ". Dropping..."); + LOG.debug("Received invalid message directed at port " + task + ". Dropping..."); return; } @@ -138,8 +121,7 @@ class NettyServer implements IConnection { } /** - * fetch a message from message queue synchronously (flags != 1) or - * asynchronously (flags==1) + * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1) */ public Object recv(Integer taskId, int flags) { try { @@ -211,14 +193,12 @@ class NettyServer implements IConnection { @Override public void send(List<TaskMessage> messages) { - throw new UnsupportedOperationException( - "Server connection should not send any messages"); + throw new UnsupportedOperationException("Server connection should not send any messages"); } @Override public void send(TaskMessage message) { - throw new UnsupportedOperationException( - "Server connection should not send any messages"); + throw new UnsupportedOperationException("Server connection should not send any messages"); } @Override @@ -231,4 +211,8 @@ class NettyServer implements IConnection { return isSyncMode; } + @Override + public boolean available() { + return true; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java index dcf2a5d..f5ec324 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java @@ -26,11 +26,9 @@ import org.slf4j.LoggerFactory; import com.alibaba.jstorm.callback.RunnableCallback; public class ReconnectRunnable extends RunnableCallback { - private static final Logger LOG = LoggerFactory - .getLogger(ReconnectRunnable.class); + private static final Logger LOG = LoggerFactory.getLogger(ReconnectRunnable.class); - private BlockingQueue<NettyClient> queue = - new LinkedBlockingDeque<NettyClient>(); + private BlockingQueue<NettyClient> queue = new LinkedBlockingDeque<NettyClient>(); public void pushEvent(NettyClient client) { queue.offer(client); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java index f84c2f0..7b511e9 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java @@ -30,8 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StormClientHandler extends SimpleChannelUpstreamHandler { - private static final Logger LOG = LoggerFactory - .getLogger(StormClientHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class); private NettyClient client; private AtomicBoolean being_closed; @@ -41,16 +40,25 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler { } /** - * Sometime when connect one bad channel which isn't writable, it will call - * this function + * @@@ Comment this function + * + * Don't allow call from low netty layer, whose call will try to obtain the lock of jstorm netty layer + * otherwise it will lead to deadlock + */ +// @Override +// public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { +// +// client.notifyInterestChanged(e.getChannel()); +// } + + /** + * Sometime when connect one bad channel which isn't writable, it will call this function */ @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent event) { + public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) { // register the newly established channel Channel channel = event.getChannel(); - LOG.info("connection established to :{}, local port:{}", - client.getRemoteAddr(), channel.getLocalAddress()); + LOG.info("connection established to :{}, local port:{}", client.getRemoteAddr(), channel.getLocalAddress()); client.handleResponse(); } @@ -63,8 +71,8 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler { /** * - * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext, - * org.jboss.netty.channel.ExceptionEvent) + * @see SimpleChannelUpstreamHandler#exceptionCaught(ChannelHandlerContext, + * ExceptionEvent) */ @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) { @@ -82,14 +90,12 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler { /** * Attention please, * - * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#channelDisconnected(org.jboss.netty.channel.ChannelHandlerContext, - * org.jboss.netty.channel.ChannelStateEvent) + * @see SimpleChannelUpstreamHandler#channelDisconnected(ChannelHandlerContext, + * ChannelStateEvent) */ @Override - public void channelDisconnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { - LOG.info("Receive channelDisconnected to {}, channel = {}", - client.getRemoteAddr(), e.getChannel()); + public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + LOG.info("Receive channelDisconnected to {}, channel = {}", client.getRemoteAddr(), e.getChannel()); // ctx.sendUpstream(e); super.channelDisconnected(ctx, e); @@ -97,10 +103,8 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler { } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - LOG.info("Connection to {} has been closed, channel = {}", - client.getRemoteAddr(), e.getChannel()); + public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + LOG.info("Connection to {} has been closed, channel = {}", client.getRemoteAddr(), e.getChannel()); super.channelClosed(ctx, e); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java index 080f91c..8927809 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java @@ -27,12 +27,12 @@ import com.alibaba.jstorm.client.ConfigExtension; class StormClientPipelineFactory implements ChannelPipelineFactory { private NettyClient client; - private Map conf; + private Map conf; StormClientPipelineFactory(NettyClient client, Map conf) { this.client = client; this.conf = conf; - + } public ChannelPipeline getPipeline() throws Exception { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java index 916ce93..c1b9cf2 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java @@ -33,8 +33,7 @@ import org.slf4j.LoggerFactory; import backtype.storm.messaging.TaskMessage; class StormServerHandler extends SimpleChannelUpstreamHandler { - private static final Logger LOG = LoggerFactory - .getLogger(StormServerHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class); private NettyServer server; private Map<Channel, Integer> failureCounters; @@ -71,29 +70,28 @@ class StormServerHandler extends SimpleChannelUpstreamHandler { LOG.info("Connection established {}", e.getChannel().getRemoteAddress()); server.addChannel(e.getChannel()); } - + @Override - public void childChannelClosed( - ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { + public void childChannelClosed(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception { super.childChannelClosed(ctx, e); LOG.info("Connection closed {}", e.getChildChannel().getRemoteAddress()); - + MessageDecoder.removeTransmitHistogram(e.getChildChannel()); } - + @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelDisconnected(ctx, e); LOG.info("Connection channelDisconnected {}", e.getChannel().getRemoteAddress()); - + MessageDecoder.removeTransmitHistogram(e.getChannel()); }; - + @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelClosed(ctx, e); LOG.info("Connection channelClosed {}", e.getChannel().getRemoteAddress()); - + MessageDecoder.removeTransmitHistogram(e.getChannel()); }; @@ -131,8 +129,7 @@ class StormServerHandler extends SimpleChannelUpstreamHandler { public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { // removeFailureCounter(e.getChannel()); if (e.getChannel() != null) { - LOG.info("Channel occur exception {}", e.getChannel() - .getRemoteAddress()); + LOG.info("Channel occur exception {}", e.getChannel().getRemoteAddress()); } server.closeChannel(e.getChannel()); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java index 9dead91..6489d4f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java @@ -26,7 +26,7 @@ import org.jboss.netty.channel.Channels; class StormServerPipelineFactory implements ChannelPipelineFactory { private NettyServer server; private Map conf; - + StormServerPipelineFactory(NettyServer server, Map conf) { this.server = server; this.conf = conf; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java deleted file mode 100755 index 760e538..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java +++ /dev/null @@ -1,267 +0,0 @@ -package com.alibaba.jstorm.metric; - -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.http.HttpEntity; -import org.apache.http.NameValuePair; -import org.apache.http.client.entity.UrlEncodedFormEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.message.BasicNameValuePair; -import org.apache.http.util.EntityUtils; -import org.apache.log4j.Logger; - -import backtype.storm.utils.Utils; - -public class AlimonitorClient extends MetricSendClient { - - public static Logger LOG = Logger.getLogger(AlimonitorClient.class); - - // Send to localhost:15776 by default - public static final String DEFAUT_ADDR = "127.0.0.1"; - public static final String DEFAULT_PORT = "15776"; - public static final int DEFAUTL_FLAG = 0; - public static final String DEFAULT_ERROR_INFO = ""; - - private final String COLLECTION_FLAG = "collection_flag"; - private final String ERROR_INFO = "error_info"; - private final String MSG = "MSG"; - - private String port; - private String requestIP; - private String monitorName; - private int collectionFlag; - private String errorInfo; - - private boolean post; - - public AlimonitorClient() { - } - - public AlimonitorClient(String requestIP, String port, boolean post) { - this.requestIP = requestIP; - this.port = port; - this.post = post; - this.monitorName = null; - this.collectionFlag = 0; - this.errorInfo = null; - } - - public void setIpAddr(String ipAddr) { - this.requestIP = ipAddr; - } - - public void setPort(String port) { - this.port = port; - } - - public void setMonitorName(String monitorName) { - this.monitorName = monitorName; - } - - public void setCollectionFlag(int flag) { - this.collectionFlag = flag; - } - - public void setErrorInfo(String msg) { - this.errorInfo = msg; - } - - public void setPostFlag(boolean post) { - this.post = post; - } - - public String buildURL() { - return "http://" + requestIP + ":" + port + "/passive"; - } - - public String buildRqstAddr() { - return "http://" + requestIP + ":" + port + "/passive?name=" - + monitorName + "&msg="; - } - - @Override - public boolean send(Map<String, Object> msg) { - try { - if (monitorName == null) { - LOG.warn("monitor name is null"); - return false; - } - return sendRequest(collectionFlag, errorInfo, msg); - } catch (Exception e) { - LOG.error("Failed to sendRequest", e); - return false; - } - } - - @Override - public boolean send(List<Map<String, Object>> msg) { - try { - if (monitorName == null) { - LOG.warn("monitor name is null"); - return false; - } - return sendRequest(collectionFlag, errorInfo, msg); - } catch (Exception e) { - LOG.error("Failed to sendRequest", e); - return false; - } - } - - public Map buildAliMonitorMsg(int collection_flag, String error_message) { - // Json format of the message sent to Alimonitor - // { - // "collection_flag":int, - // "error_info":string, - // "MSG": ojbect | array - // } - Map ret = new HashMap(); - ret.put(COLLECTION_FLAG, collection_flag); - ret.put(ERROR_INFO, error_message); - ret.put(MSG, null); - - return ret; - } - - private void addMsgData(Map jsonObj, Map<String, Object> map) { - jsonObj.put(MSG, map); - } - - private void addMsgData(Map jsonObj, List<Map<String, Object>> mapList) { - // JSONArray jsonArray = new JSONArray(); - // for(Map<String, Object> map : mapList) { - // jsonArray.add(map); - // } - - jsonObj.put(MSG, mapList); - } - - private boolean sendRequest(int collection_flag, String error_message, - Map<String, Object> msg) throws Exception { - boolean ret = false; - - if (msg.size() == 0) - return ret; - - Map jsonObj = buildAliMonitorMsg(collection_flag, error_message); - addMsgData(jsonObj, msg); - String jsonMsg = jsonObj.toString(); - LOG.info(jsonMsg); - - if (post == true) { - String url = buildURL(); - ret = httpPost(url, jsonMsg); - } else { - String request = buildRqstAddr(); - StringBuilder postAddr = new StringBuilder(); - postAddr.append(request); - postAddr.append(URLEncoder.encode(jsonMsg)); - - ret = httpGet(postAddr); - } - - return ret; - } - - private boolean sendRequest(int collection_flag, String error_message, - List<Map<String, Object>> msgList) throws Exception { - boolean ret = false; - - if (msgList.size() == 0) - return ret; - - Map jsonObj = buildAliMonitorMsg(collection_flag, error_message); - addMsgData(jsonObj, msgList); - - String jsonMsg = Utils.to_json(jsonObj); - LOG.info(jsonMsg); - - if (post == true) { - String url = buildURL(); - ret = httpPost(url, jsonMsg); - } else { - String request = buildRqstAddr(); - StringBuilder postAddr = new StringBuilder(); - postAddr.append(request); - postAddr.append(URLEncoder.encode(jsonMsg)); - - ret = httpGet(postAddr); - } - - return ret; - } - - private boolean httpGet(StringBuilder postAddr) { - boolean ret = false; - - CloseableHttpClient httpClient = HttpClientBuilder.create().build(); - CloseableHttpResponse response = null; - - try { - HttpGet request = new HttpGet(postAddr.toString()); - response = httpClient.execute(request); - HttpEntity entity = response.getEntity(); - if (entity != null) { - LOG.info(EntityUtils.toString(entity)); - } - EntityUtils.consume(entity); - ret = true; - } catch (Exception e) { - LOG.error("Exception when sending http request to alimonitor", e); - } finally { - try { - if (response != null) - response.close(); - httpClient.close(); - } catch (Exception e) { - LOG.error("Exception when closing httpclient", e); - } - } - - return ret; - } - - private boolean httpPost(String url, String msg) { - boolean ret = false; - - CloseableHttpClient httpClient = HttpClientBuilder.create().build(); - CloseableHttpResponse response = null; - - try { - HttpPost request = new HttpPost(url); - List<NameValuePair> nvps = new ArrayList<NameValuePair>(); - nvps.add(new BasicNameValuePair("name", monitorName)); - nvps.add(new BasicNameValuePair("msg", msg)); - request.setEntity(new UrlEncodedFormEntity(nvps)); - response = httpClient.execute(request); - HttpEntity entity = response.getEntity(); - if (entity != null) { - LOG.info(EntityUtils.toString(entity)); - } - EntityUtils.consume(entity); - ret = true; - } catch (Exception e) { - LOG.error("Exception when sending http request to alimonitor", e); - } finally { - try { - if (response != null) - response.close(); - httpClient.close(); - } catch (Exception e) { - LOG.error("Exception when closing httpclient", e); - } - } - - return ret; - } - - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java new file mode 100644 index 0000000..4313b6f --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.metric; + +import com.alibaba.jstorm.common.metric.AsmMetric; + +import java.io.Serializable; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public interface AsmMetricFilter extends Serializable { + /** + * Matches all metrics, regardless of type or name. + */ + AsmMetricFilter ALL = new AsmMetricFilter() { + private static final long serialVersionUID = 7089987006352295530L; + + @Override + public boolean matches(String name, AsmMetric metric) { + return true; + } + }; + + /** + * Returns {@code true} if the metric matches the filter; {@code false} otherwise. + * + * @param name the metric node + * @param metric the metric + * @return {@code true} if the metric matches the filter + */ + boolean matches(String name, AsmMetric metric); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java new file mode 100644 index 0000000..710da9d --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.metric; + +import com.alibaba.jstorm.common.metric.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * metric registry. generally methods of this class should not be exposed, wrapper methods in @see JStormMonitorMetrics should be called. + * + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class AsmMetricRegistry implements AsmMetricSet { + private static final long serialVersionUID = 8184106900230111064L; + private static final Logger LOG = LoggerFactory.getLogger(AsmMetricRegistry.class); + + protected final ConcurrentMap<String, AsmMetric> metrics = new ConcurrentHashMap<String, AsmMetric>(); + + public int size() { + return metrics.size(); + } + + /** + * Given a {@link com.alibaba.jstorm.common.metric.old.window.Metric}, registers it under the given name. + * + * @param name the metric node + * @param metric the metric + * @param <T> the type of the metric + * @return {@code metric} + * @throws IllegalArgumentException if the name is already registered + */ + @SuppressWarnings("unchecked") + public <T extends AsmMetric> AsmMetric register(String name, T metric) throws IllegalArgumentException { + metric.setMetricName(name); + final AsmMetric existing = metrics.putIfAbsent(name, metric); + if (existing == null) { + LOG.info("Successfully register metric of {}", name); + return metric; + } else { + LOG.warn("duplicate metric: {}", name); + return existing; + } + } + + /** + * Removes the metric with the given name. + * + * @param name the metric node + * @return whether or not the metric was removed + */ + public boolean remove(String name) { + final AsmMetric metric = metrics.remove(name); + if (metric != null) { + LOG.info("Successfully unregister metric of {}", name); + return true; + } + return false; + } + + public AsmMetric getMetric(String name) { + return metrics.get(name); + } + + /** + * Returns a set of the names of all the metrics in the registry. + * + * @return the names of all the metrics + */ + public SortedSet<String> getMetricNames() { + return Collections.unmodifiableSortedSet(new TreeSet<String>(metrics.keySet())); + } + + /** + * Returns a map of all the gauges in the registry and their names. + * + * @return all the gauges in the registry + */ + public SortedMap<String, AsmGauge> getGauges() { + return getGauges(AsmMetricFilter.ALL); + } + + /** + * Returns a map of all the gauges in the registry and their names which match the given filter. + * + * @param filter the metric filter to match + * @return all the gauges in the registry + */ + public SortedMap<String, AsmGauge> getGauges(AsmMetricFilter filter) { + return getMetrics(AsmGauge.class, filter); + } + + /** + * Returns a map of all the counters in the registry and their names. + * + * @return all the counters in the registry + */ + public SortedMap<String, AsmCounter> getCounters() { + return getCounters(AsmMetricFilter.ALL); + } + + /** + * Returns a map of all the counters in the registry and their names which match the given filter. + * + * @param filter the metric filter to match + * @return all the counters in the registry + */ + public SortedMap<String, AsmCounter> getCounters(AsmMetricFilter filter) { + return getMetrics(AsmCounter.class, filter); + } + + /** + * Returns a map of all the histograms in the registry and their names. + * + * @return all the histograms in the registry + */ + public SortedMap<String, AsmHistogram> getHistograms() { + return getHistograms(AsmMetricFilter.ALL); + } + + /** + * Returns a map of all the histograms in the registry and their names which match the given filter. + * + * @param filter the metric filter to match + * @return all the histograms in the registry + */ + public SortedMap<String, AsmHistogram> getHistograms(AsmMetricFilter filter) { + return getMetrics(AsmHistogram.class, filter); + } + + /** + * Returns a map of all the meters in the registry and their names. + * + * @return all the meters in the registry + */ + public SortedMap<String, AsmMeter> getMeters() { + return getMeters(AsmMetricFilter.ALL); + } + + /** + * Returns a map of all the meters in the registry and their names which match the given filter. + * + * @param filter the metric filter to match + * @return all the meters in the registry + */ + public SortedMap<String, AsmMeter> getMeters(AsmMetricFilter filter) { + return getMetrics(AsmMeter.class, filter); + } + + /** + * Returns a map of all the timers in the registry and their names. + * + * @return all the timers in the registry + */ + public SortedMap<String, AsmTimer> getTimers() { + return getTimers(AsmMetricFilter.ALL); + } + + /** + * Returns a map of all the timers in the registry and their names which match the given filter. + * + * @param filter the metric filter to match + * @return all the timers in the registry + */ + public SortedMap<String, AsmTimer> getTimers(AsmMetricFilter filter) { + return getMetrics(AsmTimer.class, filter); + } + + @SuppressWarnings("unchecked") + private <T extends AsmMetric> SortedMap<String, T> getMetrics(Class<T> klass, AsmMetricFilter filter) { + final TreeMap<String, T> timers = new TreeMap<String, T>(); + for (Map.Entry<String, AsmMetric> entry : metrics.entrySet()) { + if (klass.isInstance(entry.getValue()) && filter.matches(entry.getKey(), entry.getValue())) { + timers.put(entry.getKey(), (T) entry.getValue()); + } + } + return timers; + } + + @Override + public Map<String, AsmMetric> getMetrics() { + return metrics; + } + +}
