接口限制调用次数

大体思路 - 常用的限流算法

漏桶算法

漏桶算法(Leaky Bucket):主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。漏桶算法的示意图如下:

漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

漏桶算法示意图

令牌桶算法

令牌桶算法(Token Bucket):是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。

对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图2所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

令牌桶算法示意图

其他细节

两者主要区别在于“漏桶算法”能够强行限制数据的传输速率,而“令牌桶算法”在能够限制数据的平均传输速率外,还允许某种程度的突发传输。在“令牌桶算法”中,只要令牌桶中存在令牌,那么就允许突发地传输数据直到达到用户配置的门限,所以它适合于具有突发特性的流量。

一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。

在实施QOS策略时,可以将用户的数据限制在特定的带宽,当用户的流量超过额定带宽时,超过的带宽将采取其它方式来处理。

https://mp.weixin.qq.com/s?__biz=MzI4MTY5NTk4Ng==&mid=2247488993&idx=1&sn=4b9d5deedd0e626c456744f04b499bbb&source=41#wechat_redirect

http://3ms.huawei.com/km/blogs/details/6509003

一些实现

Guava - RateLimiter

Guava是google提供的java扩展类库,其中的限流工具类RateLimiter采用的就是令牌桶算法。RateLimiter 从概念上来讲,速率限制器会在可配置的速率下分配许可证,如果必要的话,每个acquire()会阻塞当前线程直到许可证可用后获取该许可证,一旦获取到许可证,不需要再释放许可证。

http://ifeve.com/guava-ratelimiter/

通俗的讲RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,比如你希望自己的应用程序QPS不要超过1000,那么RateLimiter设置1000的速率后,就会每秒往桶里扔1000个令牌。例如我们需要处理一个任务列表,但我们不希望每秒的任务提交超过两个,此时可以采用如下方式:

final RateLimiter rateLimiter = RateLimiter.create(2.0);

void submitTasks(List task, Executor executor) {
    for (Runnable task : tasks) {
        // 也许需要等待
        rateLimiter.acquire();
        executor.execute(task);
    }
}

有一点很重要,那就是请求的许可数从来不会影响到请求本身的限制(调用acquire(1) 和调用acquire(1000) 将得到相同的限制效果,如果存在这样的调用的话),但会影响下一次请求的限制,也就是说,如果一个高开销的任务抵达一个空闲的RateLimiter,它会被马上许可,但是下一个请求会经历额外的限制,从而来偿付高开销任务。注意:RateLimiter 并不提供公平性的保证。

public double acquire() {
        return acquire(1);
    }

 public double acquire(int permits) {
        checkPermits(permits);  //检查参数是否合法(是否大于0)
        long microsToWait;
        synchronized (mutex) { //应对并发情况需要同步
            microsToWait = reserveNextTicket(permits, readSafeMicros()); //获得需要等待的时间 
        }
        ticker.sleepMicrosUninterruptibly(microsToWait); //等待,当未达到限制时,microsToWait为0
        return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
    }

private long reserveNextTicket(double requiredPermits, long nowMicros) {
        resync(nowMicros); //补充令牌
        long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;
        double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //获取这次请求消耗的令牌数目
        double freshPermits = requiredPermits - storedPermitsToSpend;

        long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
                + (long) (freshPermits * stableIntervalMicros); 

        this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
        this.storedPermits -= storedPermitsToSpend; // 减去消耗的令牌
        return microsToNextFreeTicket;
    }

private void resync(long nowMicros) {
        // if nextFreeTicket is in the past, resync to now
        if (nowMicros > nextFreeTicketMicros) {
            storedPermits = Math.min(maxPermits,
                    storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
            nextFreeTicketMicros = nowMicros;
        }
    }

使用Semphore进行并发流控

Java 并发库的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

下面的Demo中申明了一个只有5个许可的Semaphore,而有20个线程要访问这个资源,通过acquire()和release()获取和释放访问许可:

public class SemaphoreLimiter {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(5);
        for (int i = 0; i < 20; i++) {
            final int no = i;
            Runnable run = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        System.out.println("Accessing: " + no);
                        Thread.sleep((long) (Math.random() * 10000));
                        semaphore.release();
                        System.out.println("*" + semaphore.availablePermits());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            executor.execute(run);
        }
        executor.shutdown();
    }
}

通过AtomicLong计数器控制

try {
    if(atomic.incrementAndGet() > 限流数) {
        //拒绝请求
   }
    //处理请求
} finally {
    atomic.decrementAndGet();
}

请求记数法

请求记数法顾名思义就是统计单位时间内请求的次数,如果超出次数限制则对请求做限流处理。

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class mytst {
    //单位时间内的调用次数
    private int flag = 10;
    //线程数量
    private int threadNum = 10;
    //单位时间1000ms * 60 = 1min
    private int timeRound = 1000*60;
    //用来标记调用次数
    private AtomicInteger num = new AtomicInteger(0);

    public static void main(String[] args) {

        new mytst().call();
    }

    private void call() {
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);

        Timer timer = new Timer();
        timer.schedule(new TimerTask(){
          @Override
          public void run() {
              num.set(0);;
          }
        }, 0, timeRound);

        for (int i = 0; i < 1000; i++) {
            executor.execute(new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"   进来了!");
                    num.incrementAndGet();
                    if (num.get() <= flag) {
                        System.out.println(Thread.currentThread().getName()+"   执行任务!");
                    } else {
                        System.out.println(Thread.currentThread().getName()+"   执行任务失败!调用超限!");
                    }
                }
            });
        }

        executor.shutdown();
    }
}

一种令牌桶QoS实现

将CIR(用户传输数据的速率被称为承诺信息速率)设置为8000bit/s,那么就必须每秒将8000个令牌放入桶中,当接口有数据通过时,就从桶中移除相应的令牌,每通过1 bit,就从桶中移除1个令牌。当桶里没有令牌的时候,任何流量都被视为超出额定带宽,而超出的流量就要采取额外动作。每秒钟往桶里加的令牌就决定了用户流量的速率,这个速率就是CIR,但是每秒钟需要往桶里加的令牌总数,并不是一次性加完的,一次性加进的令牌数量被称为Burst size(Bc),如果Bc只是CIR的一半,那么很明显每秒钟就需要往桶里加两次令牌,每次加的数量总是Bc的数量。还有就是加令牌的时间,Time interval(Tc),Tc表示多久该往桶里加一次令牌,而这个时间并不能手工设置,因为这个时间可以靠CIR和Bc的关系计算得到,Bc/CIR= Tc。

令牌桶算法图例

a. 按特定的速率向令牌桶投放令牌
b. 根据预设的匹配规则先对报文进行分类,不符合匹配规则的报文不需要经过令牌桶的处理,直接发送;
c. 符合匹配规则的报文,则需要令牌桶进行处理。当桶中有足够的令牌则报文可以被继续发送下去,同时令牌桶中的令牌 量按报文的长度做相应的减少;
d. 当令牌桶中的令牌不足时,报文将不能被发送,只有等到桶中生成了新的令牌,报文才可以发送。这就可以限制报文的流量只能是小于等于令牌生成的速度,达到限制流量的目的。

package com.netease.datastream.util.flowcontrol;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;


/**
 * <pre>
 * Created by inter12 on 15-3-18.
 * </pre>
 */
public class TokenBucket {

    // 默认桶大小个数 即最大瞬间流量是64M
    private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;

    // 一个桶的单位是1字节
    private int everyTokenSize = 1;

    // 瞬间最大流量
    private int maxFlowRate;

    // 平均流量
    private int avgFlowRate;

    // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 *
    // 1024 * 64
    private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(
            DEFAULT_BUCKET_SIZE);

    private ScheduledExecutorService scheduledExecutorService = Executors
            .newSingleThreadScheduledExecutor();

    private volatile boolean isStart = false;

    private ReentrantLock lock = new ReentrantLock(true);

    private static final byte A_CHAR = 'a';

    public TokenBucket() {
    }

    public TokenBucket(int maxFlowRate, int avgFlowRate) {
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
        this.everyTokenSize = everyTokenSize;
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public void addTokens(Integer tokenNum) {

        // 若是桶已经满了,就不再家如新的令牌
        for (int i = 0; i < tokenNum; i++) {
            tokenQueue.offer(Byte.valueOf(A_CHAR));
        }
    }

    public TokenBucket build() {

        start();
        return this;
    }

    /**
     * 获取足够的令牌个数
     * 
     * @return
     */
    public boolean getTokens(byte[] dataSize) {

//        Preconditions.checkNotNull(dataSize);
//        Preconditions.checkArgument(isStart,
//                "please invoke start method first !");

        int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
            if (!result) {
                return false;
            }

            int tokenCount = 0;
            for (int i = 0; i < needTokenNum; i++) {
                Byte poll = tokenQueue.poll();
                if (poll != null) {
                    tokenCount++;
                }
            }

            return tokenCount == needTokenNum;
        } finally {
            lock.unlock();
        }
    }

    public void start() {

        // 初始化桶队列大小
        if (maxFlowRate != 0) {
            tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
        }

        // 初始化令牌生产者
        TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
        scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1,
                TimeUnit.SECONDS);
        isStart = true;

    }

    public void stop() {
        isStart = false;
        scheduledExecutorService.shutdown();
    }

    public boolean isStarted() {
        return isStart;
    }

    class TokenProducer implements Runnable {

        private int avgFlowRate;
        private TokenBucket tokenBucket;

        public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
            this.avgFlowRate = avgFlowRate;
            this.tokenBucket = tokenBucket;
        }

        @Override
        public void run() {
            tokenBucket.addTokens(avgFlowRate);
        }
    }

    public static TokenBucket newBuilder() {
        return new TokenBucket();
    }

    public TokenBucket everyTokenSize(int everyTokenSize) {
        this.everyTokenSize = everyTokenSize;
        return this;
    }

    public TokenBucket maxFlowRate(int maxFlowRate) {
        this.maxFlowRate = maxFlowRate;
        return this;
    }

    public TokenBucket avgFlowRate(int avgFlowRate) {
        this.avgFlowRate = avgFlowRate;
        return this;
    }

    private String stringCopy(String data, int copyNum) {

        StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);

        for (int i = 0; i < copyNum; i++) {
            sbuilder.append(data);
        }

        return sbuilder.toString();

    }

    public static void main(String[] args) throws IOException,
            InterruptedException {

        tokenTest();
    }

    private static void arrayTest() {
        ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(
                10);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        System.out.println(tokenQueue.size());
        System.out.println(tokenQueue.remainingCapacity());
    }

    private static void tokenTest() throws InterruptedException, IOException {
        TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512)
                .maxFlowRate(1024).build();

        BufferedWriter bufferedWriter = new BufferedWriter(
                new OutputStreamWriter(new FileOutputStream("D:/ds_test")));
        String data = "xxxx";// 四个字节
        for (int i = 1; i <= 1000; i++) {

            Random random = new Random();
            int i1 = random.nextInt(100);
            boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data,
                    i1).getBytes());
            TimeUnit.MILLISECONDS.sleep(100);
            if (tokens) {
                bufferedWriter.write("token pass --- index:" + i1);
                System.out.println("token pass --- index:" + i1);
            } else {
                bufferedWriter.write("token rejuect --- index" + i1);
                System.out.println("token rejuect --- index" + i1);
            }

            bufferedWriter.newLine();
            bufferedWriter.flush();
        }

        bufferedWriter.close();
    }

}

限流某个接口的时间窗请求数

即一个时间窗口内的请求数,如想限制某个接口/服务每秒/每分钟/每天的请求数/调用量。如一些基础服务会被很多其他系统调用,比如商品详情页服务会调用基础商品服务调用,但是怕因为更新量比较大将基础服务打挂,这时我们要对每秒/每分钟的调用量进行限速;一种实现方式如下所示:

LoadingCache<Long, AtomicLong> counter = CacheBuilder.newBuilder()
        .expireAfterWrite(2, TimeUnit.SECONDS)
        .build(new CacheLoader<Long, AtomicLong>() {
            @Override
            public AtomicLong load(Long seconds) throws Exception {
                return new AtomicLong(0);
            }
        });
long limit = 1000;
while (true) {
    // 得到当前秒
    long currentSeconds = System.currentTimeMillis() / 1000;
    if (counter.get(currentSeconds).incrementAndGet() > limit) {
        System.out.println("限流了:" + currentSeconds);
        continue;
    }
    // 业务处理
    System.out.println("业务处理");
}

我们使用Guava的Cache来存储计数器,过期时间设置为2秒(保证1秒内的计数器是有的),然后我们获取当前时间戳然后取秒数来作为KEY进行计数统计和限流,这种方式也是简单粗暴,刚才说的场景够用了。

配置的存储

  1. Apollo
  2. MySQL
  3. 代码

做个功能开关,开关放在Apollo中,配置放在MySQL

DEMO

测试代码:

package com.riddle.RateLimiter;

import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.lang3.math.NumberUtils;

import java.util.concurrent.TimeUnit;

/**
 * @author w00585603
 */
public class RequestLimiter {
    /**
     * 每秒发送请求限制
     */
    private static final String REQUEST_LIMIT_PER_SECONDS = "2";

    private static RateLimiter rateLimiter;

    public static void init(String limit) {
        int requestLimit = NumberUtils.toInt(limit, 5);
        rateLimiter = RateLimiter.create(requestLimit);
        System.out.println("REQUEST LIMIT PER SECONDS: " + rateLimiter.getRate());
    }

    /**
     * 模拟外部接口调用
     */
    public static String job(int req) throws Exception {
        int rsp = req * 2 + req;
         Thread.sleep(100L);
        return String.valueOf(rsp);
    }

    public static void main(String[] args) throws Exception {
        init(REQUEST_LIMIT_PER_SECONDS);
        Thread.sleep(1000L);

        for (int i = 0; i < 50; ++i) {
//            rateLimiter.acquire();
//            System.out.println("req: " + i + " ,rsp is: " + job(i));

            if (!rateLimiter.tryAcquire(100L, TimeUnit.MILLISECONDS)) {
                System.out.println("req: " + i + " ,rsp is: null");
            } else {
                System.out.println("req: " + i + " ,rsp is: " + job(i));
            }
            Thread.sleep(200L);
        }
    }
}

输出:

REQUEST LIMIT PER SECONDS: 2.0
req: 0 ,rsp is: 0
req: 1 ,rsp is: 3
req: 2 ,rsp is: 6
req: 3 ,rsp is: 9
req: 4 ,rsp is: 12
req: 5 ,rsp is: 15
req: 6 ,rsp is: 18
req: 7 ,rsp is: null
req: 8 ,rsp is: 24
req: 9 ,rsp is: null
req: 10 ,rsp is: 30
req: 11 ,rsp is: null
req: 12 ,rsp is: 36
req: 13 ,rsp is: 39
req: 14 ,rsp is: null
req: 15 ,rsp is: 45
req: 16 ,rsp is: null
req: 17 ,rsp is: 51
req: 18 ,rsp is: null
req: 19 ,rsp is: 57
req: 20 ,rsp is: 60
req: 21 ,rsp is: null
req: 22 ,rsp is: 66
req: 23 ,rsp is: null
req: 24 ,rsp is: 72
req: 25 ,rsp is: null
req: 26 ,rsp is: 78
req: 27 ,rsp is: 81
req: 28 ,rsp is: null
req: 29 ,rsp is: 87
req: 30 ,rsp is: null
req: 31 ,rsp is: 93
req: 32 ,rsp is: null
req: 33 ,rsp is: 99
req: 34 ,rsp is: 102
req: 35 ,rsp is: null
req: 36 ,rsp is: 108
req: 37 ,rsp is: null
req: 38 ,rsp is: 114
req: 39 ,rsp is: null
req: 40 ,rsp is: 120
req: 41 ,rsp is: 123
req: 42 ,rsp is: null
req: 43 ,rsp is: 129
req: 44 ,rsp is: null
req: 45 ,rsp is: 135
req: 46 ,rsp is: null
req: 47 ,rsp is: 141
req: 48 ,rsp is: 144
req: 49 ,rsp is: null

进程已结束,退出代码为 0

标签: none

添加新评论