[ 
https://issues.apache.org/jira/browse/ACCUMULO-4187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15238575#comment-15238575
 ] 

ASF GitHub Bot commented on ACCUMULO-4187:
------------------------------------------

Github user joshelser commented on a diff in the pull request:

    https://github.com/apache/accumulo/pull/90#discussion_r59490681
  
    --- Diff: 
server/base/src/main/java/org/apache/accumulo/server/util/ratelimit/SharedRateLimiterFactory.java
 ---
    @@ -0,0 +1,152 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.accumulo.server.util.ratelimit;
    +
    +import com.google.common.collect.ImmutableList;
    +import java.util.List;
    +import java.util.WeakHashMap;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.atomic.LongAdder;
    +import org.apache.accumulo.core.conf.AccumuloConfiguration;
    +import org.apache.accumulo.core.util.ratelimit.GuavaRateLimiter;
    +import org.apache.accumulo.core.util.ratelimit.RateLimiter;
    +import org.apache.accumulo.server.util.time.SimpleTimer;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class SharedRateLimiterFactory {
    +  private static final long REPORT_RATE = 60000;
    +  private static final long UPDATE_RATE = 1000;
    +  private static SharedRateLimiterFactory instance = null;
    +  private final Logger logger = 
LoggerFactory.getLogger(SharedRateLimiterFactory.class);
    +  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new 
WeakHashMap<>();
    +
    +  private SharedRateLimiterFactory() {}
    +
    +  public static SharedRateLimiterFactory getInstance(SimpleTimer timer) {
    +    synchronized (SharedRateLimiterFactory.class) {
    +      if (instance == null) {
    +        instance = new SharedRateLimiterFactory();
    +
    +        // Update periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.update();
    +          }
    +        }, UPDATE_RATE, UPDATE_RATE);
    +
    +        // Report periodically
    +        timer.schedule(new Runnable() {
    +          @Override
    +          public void run() {
    +            instance.report();
    +          }
    +        }, REPORT_RATE, REPORT_RATE);
    +      }
    +      return instance;
    +    }
    +  }
    +
    +  public static SharedRateLimiterFactory getInstance(AccumuloConfiguration 
conf) {
    +    return getInstance(SimpleTimer.getInstance(conf));
    +  }
    +
    +  /**
    +   * Lookup the RateLimiter associated with the specified name, or create 
a new one for that name. RateLimiters should be closed when no longer needed.
    +   *
    +   * @param name
    +   *          key for the rate limiter
    +   * @param rateGenerator
    +   *          a function which can be called to get what the current rate 
for the rate limiter should be.
    +   */
    +  public RateLimiter create(String name, Callable<Long> rateGenerator) {
    +    synchronized (activeLimiters) {
    +      if (activeLimiters.containsKey(name)) {
    +        SharedRateLimiter limiter = activeLimiters.get(name);
    +        return limiter;
    +      } else {
    +        long initialRate;
    +        try {
    +          initialRate = rateGenerator.call();
    +        } catch (Exception ex) {
    +          throw new IllegalStateException(ex);
    +        }
    +        SharedRateLimiter limiter = new SharedRateLimiter(name, 
rateGenerator, initialRate);
    +        activeLimiters.put(name, limiter);
    +        return limiter;
    +      }
    +    }
    +  }
    +
    +  protected void update() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.update();
    +    }
    +  }
    +
    +  protected void report() {
    +    List<SharedRateLimiter> limiters;
    +    synchronized (activeLimiters) {
    +      limiters = ImmutableList.copyOf(activeLimiters.values());
    +    }
    +    for (SharedRateLimiter limiter : limiters) {
    +      limiter.report();
    +    }
    +  }
    +
    +  protected class SharedRateLimiter extends GuavaRateLimiter {
    +    private final LongAdder permitsAcquired = new LongAdder();
    +    private final Callable<Long> rateCallable;
    +    private final String name;
    +
    +    SharedRateLimiter(String name, Callable<Long> rateCallable, long 
initialRate) {
    +      super(initialRate);
    +      this.name = name;
    +      this.rateCallable = rateCallable;
    +    }
    +
    +    @Override
    +    public void acquire(long permits) {
    +      super.acquire(permits);
    +      permitsAcquired.add(permits);
    +    }
    +
    +    public void update() {
    +      try {
    +        // Reset rate if needed
    +        long rate = rateCallable.call();
    +        if (rate != getRate()) {
    +          setRate(rate);
    +        }
    +      } catch (Exception ex) {
    +        logger.debug("Failed to update rate limiter", ex);
    +      }
    +    }
    +
    +    public void report() {
    +      long sum = permitsAcquired.sumThenReset();
    +      if (sum > 0) {
    +        logger.debug(String.format("RateLimiter '%s': %,d of %,d 
permits/second", name, sum * 1000L / REPORT_RATE, getRate()));
    --- End diff --
    
    Wrap this in a `logger.ifDebugIsEnabled()` conditional, please. Typically, 
Logger instances are named `log` not `logger`. You could also do the string 
formatting "natively" with SLF4J's `{}` replacement syntax.


> Rate limiting of major compactions
> ----------------------------------
>
>                 Key: ACCUMULO-4187
>                 URL: https://issues.apache.org/jira/browse/ACCUMULO-4187
>             Project: Accumulo
>          Issue Type: Improvement
>          Components: core, tserver
>    Affects Versions: 1.8.0
>            Reporter: Shawn Walker
>            Assignee: Shawn Walker
>            Priority: Minor
>             Fix For: 1.8.0
>
>
> In discussing [ACCUMULO-4166] with Keith Turner, we decided that the 
> underlying issue is that major compactions can overwhelm a tablet server, 
> rendering it nearly unresponsive.
> To address this, we should take a cue from Apache Cassandra and restrict how 
> quickly we perform major compactions.  Rate limiting reads and writes 
> involved in major compactions will directly affect the IO load caused by 
> major compactions, and should also indirectly affect the CPU load.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to