junrao commented on a change in pull request #9114:
URL: https://github.com/apache/kafka/pull/9114#discussion_r465310710



##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {

Review comment:
       Could we add a high level description of the class?

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the avg rate to 3 which is 
already
+        // above the quota.
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+        // ((30 / 10) - 2) / 2 * 10 = 5s
+        time.sleep(5000);
+
+        // But, recording a second value is rejected because the avg rate is 
still equal
+        // to 3 after 5s.
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+        assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 
30, time.milliseconds()));
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithTokenBucket() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("credits", 
"test-group");
+        assertTrue(sensor.add(metricName, new TokenBucket()));
+        final KafkaMetric tkMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the remaining credits below 
zero
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+        // 10 / 2 = 5s
+        time.sleep(5000);
+
+        // Unlike the default rate based on a windowed sum, it works as 
expected.
+        assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    private void strictRecord(Sensor sensor, double value, long timeMs) {
+        synchronized (sensor) {
+            sensor.checkQuotas(timeMs);

Review comment:
       sensor.record() always calls checkQuotas(). Why do we need to call it 
explicitly here?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TokenBucketTest {
+    Time time;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, System.currentTimeMillis(), System.nanoTime());
+    }
+
+    @Test
+    public void testRecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units
+        MetricConfig config = new MetricConfig()
+            .quota(Quota.upperBound(5))
+            .timeWindow(2, TimeUnit.SECONDS)
+            .samples(11);
+
+        TokenBucket tk = new TokenBucket();
+
+        // Expect 100 credits at T
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Record 60 at T, expect 13 credits
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 5, expect 45 credits
+        time.sleep(2000);
+        tk.record(config, 5, time.milliseconds());
+        assertEquals(45, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 60, expect -5 credits
+        time.sleep(2000);
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(-5, tk.measure(config, time.milliseconds()), 0.1);
+    }
+
+    @Test
+    public void testUnrecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units
+        MetricConfig config = new MetricConfig()
+            .quota(Quota.upperBound(5))
+            .timeWindow(2, TimeUnit.SECONDS)
+            .samples(11);
+
+        TokenBucket tk = new TokenBucket();
+
+        // Expect 100 credits at T
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Record 60 at T, expect 100 credits

Review comment:
       We are recording -60.

##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -131,6 +133,22 @@ class PermissiveControllerMutationQuota(private val time: 
Time,
 
 object ControllerMutationQuotaManager {
   val QuotaControllerMutationDefault = Int.MaxValue.toDouble
+
+  /**
+   * This calculates the amount of time needed to bring the TokenBucket within 
quota
+   * assuming that no new metrics are recorded.
+   *
+   * Basically, if a value < 0 is observed, the time required to bring it to 
zero is
+   * -value / refill rate (quota bound) * 1000.
+   */
+  def throttleTime(e: QuotaViolationException, timeMs: Long): Long = {

Review comment:
       throttleTime => throttleTimeMs ?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double tokens;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.tokens = 0;
+        this.lastUpdateMs = 0;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long timeMs) {
+        if (config.quota() == null)
+            return Long.MAX_VALUE;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        return this.tokens;
+    }
+
+    @Override
+    public void record(final MetricConfig config, final double value, final 
long timeMs) {
+        if (config.quota() == null)
+            return;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        this.tokens = Math.min(burst, this.tokens - value);
+    }
+
+    private void refill(final double quota, final double burst, final long 
timeMs) {
+        this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - 
lastUpdateMs));
+        this.lastUpdateMs = timeMs;
+    }
+
+    private double burst(final MetricConfig config) {
+        return (config.samples() - 1) * convert(config.timeWindowMs()) * 
config.quota().bound();
+    }
+
+    private double convert(final long timeMs) {
+        switch (unit) {

Review comment:
       This code is duplicated from Rate. Could we reuse it somehow?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/TokenBucket.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.stats;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+public class TokenBucket implements MeasurableStat {
+    private final TimeUnit unit;
+    private double tokens;
+    private long lastUpdateMs;
+
+    public TokenBucket() {
+        this(TimeUnit.SECONDS);
+    }
+
+    public TokenBucket(TimeUnit unit) {
+        this.unit = unit;
+        this.tokens = 0;
+        this.lastUpdateMs = 0;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long timeMs) {
+        if (config.quota() == null)
+            return Long.MAX_VALUE;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        return this.tokens;
+    }
+
+    @Override
+    public void record(final MetricConfig config, final double value, final 
long timeMs) {
+        if (config.quota() == null)
+            return;
+        final double quota = config.quota().bound();
+        final double burst = burst(config);
+        refill(quota, burst, timeMs);
+        this.tokens = Math.min(burst, this.tokens - value);
+    }
+
+    private void refill(final double quota, final double burst, final long 
timeMs) {
+        this.tokens = Math.min(burst, this.tokens + quota * convert(timeMs - 
lastUpdateMs));
+        this.lastUpdateMs = timeMs;
+    }
+
+    private double burst(final MetricConfig config) {
+        return (config.samples() - 1) * convert(config.timeWindowMs()) * 
config.quota().bound();

Review comment:
       Rate actually allows the windowSize to be close to the full samples * 
perSampleWindow. The logic around `config.samples() - 1` is just to make sure 
the windowSize contains at least that many full windows. So, to match that 
behavior, it seems that burst should use `config.samples()`.

##########
File path: core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala
##########
@@ -156,6 +174,24 @@ class ControllerMutationQuotaManager(private val config: 
ClientQuotaManagerConfi
       quotaMetricTags.asJava)
   }
 
+  protected def clientTokenBucketMetricName(quotaMetricTags: Map[String, 
String]): MetricName = {

Review comment:
       Could this be private?

##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,95 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
+
+    @Test
+    public void testStrictQuotaEnforcementWithRate() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the avg rate to 3 which is 
already
+        // above the quota.
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+        // ((30 / 10) - 2) / 2 * 10 = 5s
+        time.sleep(5000);
+
+        // But, recording a second value is rejected because the avg rate is 
still equal
+        // to 3 after 5s.
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+        assertThrows(QuotaViolationException.class, () -> strictRecord(sensor, 
30, time.milliseconds()));
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithTokenBucket() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("credits", 
"test-group");
+        assertTrue(sensor.add(metricName, new TokenBucket()));
+        final KafkaMetric tkMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the remaining credits below 
zero
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-10, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+        // 10 / 2 = 5s
+        time.sleep(5000);
+
+        // Unlike the default rate based on a windowed sum, it works as 
expected.
+        assertEquals(0, tkMetric.measurableValue(time.milliseconds()), 0.1);
+        strictRecord(sensor, 30, time.milliseconds());
+        assertEquals(-30, tkMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    private void strictRecord(Sensor sensor, double value, long timeMs) {
+        synchronized (sensor) {
+            sensor.checkQuotas(timeMs);
+            sensor.record(value, timeMs, false);
+        }
+    }
+
+    @Test
+    public void testRecordAndCheckQuotaUseMetricConfigOfEachStat() {
+        final Time time = new MockTime(0, System.currentTimeMillis(), 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor");
+
+        final MeasurableStat stat1 = Mockito.mock(MeasurableStat.class);
+        final MetricName stat1Name = metrics.metricName("stat1", "test-group");
+        final MetricConfig stat1Config = new 
MetricConfig().quota(Quota.upperBound(5));
+        sensor.add(stat1Name, stat1, stat1Config);
+
+        final MeasurableStat stat2 = Mockito.mock(MeasurableStat.class);
+        final MetricName stat2Name = metrics.metricName("stat2", "test-group");
+        final MetricConfig stat2Config = new 
MetricConfig().quota(Quota.upperBound(10));
+        sensor.add(stat2Name, stat2, stat2Config);
+
+        sensor.record(10, 1);
+        Mockito.verify(stat1).record(stat1Config, 10, 1);
+        Mockito.verify(stat2).record(stat2Config, 10, 1);
+
+        Mockito.when(stat1.measure(stat1Config, 2)).thenReturn(2.0);
+        Mockito.when(stat2.measure(stat2Config, 2)).thenReturn(2.0);
+        sensor.checkQuotas(2);

Review comment:
       Here, we are just verifying there is no quota exception?

##########
File path: 
clients/src/test/java/org/apache/kafka/common/metrics/TokenBucketTest.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.metrics.stats.TokenBucket;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TokenBucketTest {
+    Time time;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, System.currentTimeMillis(), System.nanoTime());
+    }
+
+    @Test
+    public void testRecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units
+        MetricConfig config = new MetricConfig()
+            .quota(Quota.upperBound(5))
+            .timeWindow(2, TimeUnit.SECONDS)
+            .samples(11);
+
+        TokenBucket tk = new TokenBucket();
+
+        // Expect 100 credits at T
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Record 60 at T, expect 13 credits
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 5, expect 45 credits
+        time.sleep(2000);
+        tk.record(config, 5, time.milliseconds());
+        assertEquals(45, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 60, expect -5 credits
+        time.sleep(2000);
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(-5, tk.measure(config, time.milliseconds()), 0.1);
+    }
+
+    @Test
+    public void testUnrecord() {
+        // Rate  = 5 unit / sec
+        // Burst = 2 * (11 - 1) = 20 units
+        MetricConfig config = new MetricConfig()
+            .quota(Quota.upperBound(5))
+            .timeWindow(2, TimeUnit.SECONDS)
+            .samples(11);
+
+        TokenBucket tk = new TokenBucket();
+
+        // Expect 100 credits at T
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Record 60 at T, expect 100 credits
+        tk.record(config, -60, time.milliseconds());
+        assertEquals(100, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 60, expect 40 credits
+        time.sleep(2000);
+        tk.record(config, 60, time.milliseconds());
+        assertEquals(40, tk.measure(config, time.milliseconds()), 0.1);
+
+        // Advance by 2s, record 60, expect 100 credits

Review comment:
       We are recording -60.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to