Ethanlm commented on a change in pull request #3307:
URL: https://github.com/apache/storm/pull/3307#discussion_r466760053
##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/WindowedMeter.java
##########
@@ -76,65 +66,47 @@ public CountStatAndMetric(int numBuckets) {
odBuckets = new long[numBuckets];
odTime = new long[numBuckets];
allTime = 0;
- exactExtra = 0;
- bucketStart = startTime >= 0 ? startTime : System.currentTimeMillis();
+ bucketStart = Time.currentTimeMillis();
currentBucket = new AtomicLong(0);
- if (startTime < 0) {
- task = new Fresher();
- MetricStatTimer.timer.scheduleAtFixedRate(task, tmSize, tmSize);
- } else {
- task = null;
Review comment:
I don't know if there is any issue here. It seems weird to use
`scheduleAtFixedRate` when we use simulated time because simulatedTime doesn't
advance by itself but `scheduleAtFixedRate` relies on real time.
##########
File path:
storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
##########
@@ -357,8 +345,7 @@ public void failSpoutMsg(SpoutExecutor executor, Task
taskData, Long timeDelta,
spout.fail(tupleInfo.getMessageId());
new SpoutFailInfo(tupleInfo.getMessageId(), taskId,
timeDelta).applyOn(taskData.getUserContext());
if (timeDelta != null) {
- executor.getStats().spoutFailedTuple(tupleInfo.getStream(),
timeDelta,
-
taskData.getTaskMetrics().getFailed(tupleInfo.getStream()));
+
executor.getExecutorMetrics().spoutFailedTuple(taskData.getComponentId(),
tupleInfo.getStream(), taskData.getTaskId());
Review comment:
This is not a big deal but maybe change `executor.getExecutorMetrics()`
to `executorMetrics` to make it consistent with other places.
##########
File path:
storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
##########
@@ -153,9 +160,20 @@ public Histogram histogram(String name, TopologyContext
context) {
return histogram;
}
+ public Histogram histogram(String name, String topologyId, String
componentId, Integer taskId, Integer workerPort, String streamId) {
+ MetricNames metricNames = workerMetricName(name, topologyId,
componentId, streamId, taskId, workerPort);
+ Histogram histogram = registry.histogram(metricNames.getLongName());
+ saveMetricTaskIdMapping(taskId, metricNames, histogram,
taskIdHistograms);
+ return histogram;
+ }
+
private static <T extends Metric> void saveMetricTaskIdMapping(Integer
taskId, MetricNames names, T metric, Map<Integer,
Map<String, T>> taskIdMetrics) {
Map<String, T> metrics = taskIdMetrics.computeIfAbsent(taskId, (tid)
-> new HashMap<>());
+ if (metrics.get(names.getV2TickName()) != null) {
+ LOG.warn("Adding duplicate short metric for " +
names.getV2TickName()
Review comment:
What will happen?
In which case would there be a duplication?
##########
File path:
storm-client/src/jvm/org/apache/storm/metrics2/BoltExecutorMetrics.java
##########
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.storm.generated.BoltStats;
+import org.apache.storm.generated.ExecutorSpecificStats;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+
+public class BoltExecutorMetrics extends ExecutorMetrics {
+ private final ConcurrentMap<String, WindowedMeter> executedByStream = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<String, WindowedHistogram> processLatencies =
new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, WindowedHistogram> executeLatencies =
new ConcurrentHashMap<>();
+
+ public BoltExecutorMetrics(WorkerTopologyContext context,
StormMetricRegistry metricRegistry,
+ Map<String, Object> topoConf) {
+ super(context, metricRegistry, topoConf);
+ }
+
+ public void boltFailedTuple(String componentId, String streamId, int
taskId) {
+ String key = createKey(componentId, streamId);
+ WindowedMeter meter = this.failedByStream.get(key);
+ if (meter == null) {
+ String name = createMetricName("__fail-count", componentId,
streamId);
Review comment:
An issue here is this "componentId" is not the componentId of this
taskId. It is the upstream component who sends out this "tuple", more like a
srcComponentId.
see
https://github.com/apache/storm/blob/master/docs/Metrics.md#tuple-counting-metrics
"The tuple counting metrics are generally reported to the metrics consumers
as maps unless explicitly stated otherwise. They break down each count for
finer grained reporting. The keys to these maps fall into two categories
"${stream_name}" or "${upstream_component}:${stream_name}". The former is used
for all spout metrics and for outgoing bolt metrics (__emit-count and
__transfer-count). The latter is used for bolt metrics that deal with incoming
tuples."
##########
File path:
storm-client/test/jvm/org/apache/storm/metrics2/WindowedMeterTest.java
##########
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import junit.framework.TestCase;
+import org.apache.storm.utils.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WindowedMeterTest extends TestCase {
+
+ @Test
+ public void testIncrementRotate() {
+ try (Time.SimulatedTime t = new Time.SimulatedTime()) {
+ WindowedMeter meter = new WindowedMeter("meter1", new
StormMetricRegistry(), "testTopology",
+ "component1", "stream1",1, 6720, 2);
+ meter.close(); // prevent task from rotating
+
+ // add 10 and verify all timecounts see it
+ meter.incBy(10);
+ Long total = new Long(10L);
+ Assert.assertEquals(total, meter.getTimeCounts().get("600"));
+ Assert.assertEquals(total, meter.getTimeCounts().get("10800"));
+ Assert.assertEquals(total, meter.getTimeCounts().get("86400"));
+ Assert.assertEquals(total, meter.getTimeCounts().get(":all-time"));
+ Assert.assertEquals(total, new Long(meter.getMeter().getCount()));
+
+ // rotate 11 minutes and validate 10 minute bucket is reduced while
+ // others remain steady.
+ Time.advanceTime(11L * 60L * 1000L);
+ meter.rotateSched();
+ Assert.assertTrue(meter.getTimeCounts().get("600") < total);
Review comment:
Is it possible to calculate an exact number here to be more accurate?
##########
File path:
storm-client/test/jvm/org/apache/storm/metric/internal/LatencyStatAndMetricTest.java
##########
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.metric.internal;
-
-import java.util.Map;
-import junit.framework.TestCase;
-import org.junit.Test;
-
-/**
- * Unit test for LatencyStatAndMetric
- */
-public class LatencyStatAndMetricTest extends TestCase {
Review comment:
Do we have a replacement unit test for WindowedHistogram?
##########
File path: storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
##########
@@ -94,8 +86,8 @@ private static IWaitStrategy makeSystemBoltWaitStrategy() {
}
@Override
- public BoltExecutorStats getStats() {
- return stats;
+ public BoltExecutorMetrics getExecutorMetrics() {
+ return executorMetrics;
Review comment:
This essentially includes `BoltExecutorStats` which is used by
executorHeartbeat and displayed on UI, and metrics, which will be sent to
monitoring platform. Should we separate them to make the code easier to
understand/maintain?
Thoughts here is to having `BoltExecutorMetrics` only contain metrics, and
`BoltExecutorStats` only contain stats. In WindowdMeter, they only share
"incBy" method. All other stuff is separate between meter vs stats.
##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/WindowedMeter.java
##########
@@ -148,34 +120,27 @@ private synchronized void rotate(long value, long
timeSpent, long targetSize, lo
}
}
- /**
- * Get time counts.
- * @return a map of time window to count. Keys are "600" for last 10 mins
"10800" for the last 3 hours "86400" for the last day
- * ":all-time" for all time
- */
- public synchronized Map<String, Long> getTimeCounts() {
- return getTimeCounts(System.currentTimeMillis());
- }
-
- synchronized Map<String, Long> getTimeCounts(long now) {
+ public Map<String, Long> getTimeCounts() {
Review comment:
Should we keep the javadoc?
##########
File path:
storm-client/test/jvm/org/apache/storm/metrics2/BoltExecutorMetricsTest.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.apache.storm.Config;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class BoltExecutorMetricsTest extends TestCase {
+
+ @Test
+ public void testCounts() {
Review comment:
Do we have tests for metrics (not only stats)
##########
File path:
storm-client/test/jvm/org/apache/storm/metrics2/WindowedMeterTest.java
##########
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import junit.framework.TestCase;
+import org.apache.storm.utils.Time;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WindowedMeterTest extends TestCase {
+
+ @Test
+ public void testIncrementRotate() {
+ try (Time.SimulatedTime t = new Time.SimulatedTime()) {
+ WindowedMeter meter = new WindowedMeter("meter1", new
StormMetricRegistry(), "testTopology",
+ "component1", "stream1",1, 6720, 2);
+ meter.close(); // prevent task from rotating
Review comment:
Looks a little tricky here. "close" to prevent task from rotating.
Others might not be aware of this. Potentially confusing future developers.
##########
File path:
storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
##########
@@ -74,6 +74,13 @@
return gauge;
}
+ public Meter meter(String name, String topologyId, String componentId,
Integer taskId, Integer workerPort, String streamId) {
Review comment:
Maybe we should remove `streamId` every where since it is inside the
metric name.
Or we explicitly combine metricName+srcComponent+streamId as the new metric
Name in `workerMetricName` method to make it consistently (ignore when the
certain variable is `null`)
##########
File path:
storm-client/test/jvm/org/apache/storm/metrics2/BoltExecutorMetricsTest.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.apache.storm.Config;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class BoltExecutorMetricsTest extends TestCase {
+
+ @Test
+ public void testCounts() {
+ WorkerTopologyContext workerTopologyContext =
Mockito.mock(WorkerTopologyContext.class);
+ Map<String, Object> topoConf = new HashMap<>();
+ topoConf.put(Config.NUM_STAT_BUCKETS, 20);
+ topoConf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, 0.05);
+ BoltExecutorMetrics boltExecutorMetrics = new
BoltExecutorMetrics(workerTopologyContext, new StormMetricRegistry(), topoConf);
+
+ validateExecutorMetrics(boltExecutorMetrics);
+
+ // test fail
+ boltExecutorMetrics.boltFailedTuple("component1", "stream1", 1);
+ Long stream1Count =
boltExecutorMetrics.getFailTimeCounts().get("600").get("[component1, stream1]");
+ Assert.assertEquals(new Long(20L),stream1Count);
+ Long stream2Count =
boltExecutorMetrics.getFailTimeCounts().get("600").get("[component1, stream2]");
+ Assert.assertNull(stream2Count);
+
+ // test ack
+ boltExecutorMetrics.boltAckedTuple("component1", "stream1", 1, 10);
+ boltExecutorMetrics.boltAckedTuple("component1", "stream1", 1, 20);
+ boltExecutorMetrics.boltAckedTuple("component1", "stream2", 2, 222);
+ stream1Count =
boltExecutorMetrics.getAckTimeCounts().get("600").get("[component1, stream1]");
+ Assert.assertEquals(new Long(40L), stream1Count);
Review comment:
I am a little lost. Why is this 40?
If it is because of sampling, maybe change the sample rate to 1 so we can
get accurate result
##########
File path: storm-client/src/jvm/org/apache/storm/metrics2/WindowedHistogram.java
##########
@@ -1,38 +1,31 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
+ * and limitations under the License.
*/
-package org.apache.storm.metric.internal;
+package org.apache.storm.metrics2;
+import com.codahale.metrics.Histogram;
import java.util.HashMap;
import java.util.Map;
import java.util.TimerTask;
-import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.internal.MetricStatTimer;
+import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
/**
- * Acts as a Latency Metric, but also keeps track of approximate latency for
the last 10 mins, 3 hours, 1 day, and all time.
+ * Acts as a Histogram, but keeps track of approximate counts for the last 10
mins, 3 hours, 1 day, and all time.
*/
-public class LatencyStatAndMetric implements IMetric {
- //The current lat and count buckets are protected by a different lock
Review comment:
By removing this, are we saying use the same lock doesn't have much lock
contention any more?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]