junrao commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465921001



##########
File path: 
clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TokenBucketTest {
+    Time time;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, System.currentTimeMillis(), System.nanoTime());
+    }
+
+    @Test
+    public void testRecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units
+        MetricConfig config = new MetricConfig()
+            .quota(Quota.upperBound(5))
+            .timeWindow(2, TimeUnit.SECONDS)
+            .samples(10);
+
+        TokenBucket tk = new TokenBucket();
+
+        // Expect 100 credits at T
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Record 60 at T, expect 13 credits
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 5, expect 45 credits
+        time.sleep(2000);
+        tk.record(config, 5, time.milliseconds());
+        assertEquals(45, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 60, expect -5 credits
+        time.sleep(2000);
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(-5, tk.measure(config, time.milliseconds()), 0.1);
+    }
+
+    @Test
+    public void testUnrecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units

Review comment:
       we have 10 samples now.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Quota;
+
+import static org.apache.kafka.common.metrics.internals.MetricsUtils.convert;
+
+/**
+ * The {@link TokenBucket} is a {@link MeasurableStat} implementing a token 
bucket algorithm
+ * that is usable within a {@link org.apache.kafka.common.metrics.Sensor}.
+ *
+ * The {@link Quota#bound()} defined the refill rate of the bucket while the 
maximum burst or
+ * the maximum number of credits of the bucket is defined by
+ * {@link MetricConfig#samples() * MetricConfig#timeWindowMs() * 
Quota#bound()}.
+ *
+ * The quota is considered as exhausted when the amount of remaining credits 
in the bucket

Review comment:
       Could we document how this quota behaves differently from existing quota?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TokenBucketTest {
+    Time time;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, System.currentTimeMillis(), System.nanoTime());
+    }
+
+    @Test
+    public void testRecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units

Review comment:
       we have 10 samples now.

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));

Review comment:
       Is the test based on 10 samples?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to