接口限制调用次数
接口限制调用次数
大体思路 - 常用的限流算法
漏桶算法
漏桶算法(Leaky Bucket):主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。漏桶算法的示意图如下:
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
令牌桶算法
令牌桶算法(Token Bucket):是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图2所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
其他细节
两者主要区别在于“漏桶算法”能够强行限制数据的传输速率,而“令牌桶算法”在能够限制数据的平均传输速率外,还允许某种程度的突发传输。在“令牌桶算法”中,只要令牌桶中存在令牌,那么就允许突发地传输数据直到达到用户配置的门限,所以它适合于具有突发特性的流量。
一般开发高并发系统常见的限流有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。
在实施QOS策略时,可以将用户的数据限制在特定的带宽,当用户的流量超过额定带宽时,超过的带宽将采取其它方式来处理。
http://3ms.huawei.com/km/blogs/details/6509003
一些实现
Guava - RateLimiter
Guava是google提供的java扩展类库,其中的限流工具类RateLimiter采用的就是令牌桶算法。RateLimiter 从概念上来讲,速率限制器会在可配置的速率下分配许可证,如果必要的话,每个acquire()
会阻塞当前线程直到许可证可用后获取该许可证,一旦获取到许可证,不需要再释放许可证。
http://ifeve.com/guava-ratelimiter/
通俗的讲RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,比如你希望自己的应用程序QPS不要超过1000,那么RateLimiter设置1000的速率后,就会每秒往桶里扔1000个令牌。例如我们需要处理一个任务列表,但我们不希望每秒的任务提交超过两个,此时可以采用如下方式:
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List task, Executor executor) {
for (Runnable task : tasks) {
// 也许需要等待
rateLimiter.acquire();
executor.execute(task);
}
}
有一点很重要,那就是请求的许可数从来不会影响到请求本身的限制(调用acquire(1) 和调用acquire(1000) 将得到相同的限制效果,如果存在这样的调用的话),但会影响下一次请求的限制,也就是说,如果一个高开销的任务抵达一个空闲的RateLimiter,它会被马上许可,但是下一个请求会经历额外的限制,从而来偿付高开销任务。注意:RateLimiter 并不提供公平性的保证。
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
checkPermits(permits); //检查参数是否合法(是否大于0)
long microsToWait;
synchronized (mutex) { //应对并发情况需要同步
microsToWait = reserveNextTicket(permits, readSafeMicros()); //获得需要等待的时间
}
ticker.sleepMicrosUninterruptibly(microsToWait); //等待,当未达到限制时,microsToWait为0
return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
}
private long reserveNextTicket(double requiredPermits, long nowMicros) {
resync(nowMicros); //补充令牌
long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;
double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //获取这次请求消耗的令牌数目
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
this.storedPermits -= storedPermitsToSpend; // 减去消耗的令牌
return microsToNextFreeTicket;
}
private void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
storedPermits = Math.min(maxPermits,
storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
nextFreeTicketMicros = nowMicros;
}
}
使用Semphore进行并发流控
Java 并发库的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。
下面的Demo中申明了一个只有5个许可的Semaphore,而有20个线程要访问这个资源,通过acquire()和release()获取和释放访问许可:
public class SemaphoreLimiter {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 20; i++) {
final int no = i;
Runnable run = new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Accessing: " + no);
Thread.sleep((long) (Math.random() * 10000));
semaphore.release();
System.out.println("*" + semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executor.execute(run);
}
executor.shutdown();
}
}
通过AtomicLong计数器控制
try {
if(atomic.incrementAndGet() > 限流数) {
//拒绝请求
}
//处理请求
} finally {
atomic.decrementAndGet();
}
请求记数法
请求记数法顾名思义就是统计单位时间内请求的次数,如果超出次数限制则对请求做限流处理。
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class mytst {
//单位时间内的调用次数
private int flag = 10;
//线程数量
private int threadNum = 10;
//单位时间1000ms * 60 = 1min
private int timeRound = 1000*60;
//用来标记调用次数
private AtomicInteger num = new AtomicInteger(0);
public static void main(String[] args) {
new mytst().call();
}
private void call() {
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
Timer timer = new Timer();
timer.schedule(new TimerTask(){
@Override
public void run() {
num.set(0);;
}
}, 0, timeRound);
for (int i = 0; i < 1000; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" 进来了!");
num.incrementAndGet();
if (num.get() <= flag) {
System.out.println(Thread.currentThread().getName()+" 执行任务!");
} else {
System.out.println(Thread.currentThread().getName()+" 执行任务失败!调用超限!");
}
}
});
}
executor.shutdown();
}
}
一种令牌桶QoS实现
将CIR(用户传输数据的速率被称为承诺信息速率)设置为8000bit/s,那么就必须每秒将8000个令牌放入桶中,当接口有数据通过时,就从桶中移除相应的令牌,每通过1 bit,就从桶中移除1个令牌。当桶里没有令牌的时候,任何流量都被视为超出额定带宽,而超出的流量就要采取额外动作。每秒钟往桶里加的令牌就决定了用户流量的速率,这个速率就是CIR,但是每秒钟需要往桶里加的令牌总数,并不是一次性加完的,一次性加进的令牌数量被称为Burst size(Bc),如果Bc只是CIR的一半,那么很明显每秒钟就需要往桶里加两次令牌,每次加的数量总是Bc的数量。还有就是加令牌的时间,Time interval(Tc),Tc表示多久该往桶里加一次令牌,而这个时间并不能手工设置,因为这个时间可以靠CIR和Bc的关系计算得到,Bc/CIR= Tc。
a. 按特定的速率向令牌桶投放令牌
b. 根据预设的匹配规则先对报文进行分类,不符合匹配规则的报文不需要经过令牌桶的处理,直接发送;
c. 符合匹配规则的报文,则需要令牌桶进行处理。当桶中有足够的令牌则报文可以被继续发送下去,同时令牌桶中的令牌 量按报文的长度做相应的减少;
d. 当令牌桶中的令牌不足时,报文将不能被发送,只有等到桶中生成了新的令牌,报文才可以发送。这就可以限制报文的流量只能是小于等于令牌生成的速度,达到限制流量的目的。
package com.netease.datastream.util.flowcontrol;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* <pre>
* Created by inter12 on 15-3-18.
* </pre>
*/
public class TokenBucket {
// 默认桶大小个数 即最大瞬间流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
// 一个桶的单位是1字节
private int everyTokenSize = 1;
// 瞬间最大流量
private int maxFlowRate;
// 平均流量
private int avgFlowRate;
// 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 *
// 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(
DEFAULT_BUCKET_SIZE);
private ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor();
private volatile boolean isStart = false;
private ReentrantLock lock = new ReentrantLock(true);
private static final byte A_CHAR = 'a';
public TokenBucket() {
}
public TokenBucket(int maxFlowRate, int avgFlowRate) {
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
this.everyTokenSize = everyTokenSize;
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public void addTokens(Integer tokenNum) {
// 若是桶已经满了,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(Byte.valueOf(A_CHAR));
}
}
public TokenBucket build() {
start();
return this;
}
/**
* 获取足够的令牌个数
*
* @return
*/
public boolean getTokens(byte[] dataSize) {
// Preconditions.checkNotNull(dataSize);
// Preconditions.checkArgument(isStart,
// "please invoke start method first !");
int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
if (!result) {
return false;
}
int tokenCount = 0;
for (int i = 0; i < needTokenNum; i++) {
Byte poll = tokenQueue.poll();
if (poll != null) {
tokenCount++;
}
}
return tokenCount == needTokenNum;
} finally {
lock.unlock();
}
}
public void start() {
// 初始化桶队列大小
if (maxFlowRate != 0) {
tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
}
// 初始化令牌生产者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1,
TimeUnit.SECONDS);
isStart = true;
}
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
}
public boolean isStarted() {
return isStart;
}
class TokenProducer implements Runnable {
private int avgFlowRate;
private TokenBucket tokenBucket;
public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
this.avgFlowRate = avgFlowRate;
this.tokenBucket = tokenBucket;
}
@Override
public void run() {
tokenBucket.addTokens(avgFlowRate);
}
}
public static TokenBucket newBuilder() {
return new TokenBucket();
}
public TokenBucket everyTokenSize(int everyTokenSize) {
this.everyTokenSize = everyTokenSize;
return this;
}
public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
}
public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
}
private String stringCopy(String data, int copyNum) {
StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
for (int i = 0; i < copyNum; i++) {
sbuilder.append(data);
}
return sbuilder.toString();
}
public static void main(String[] args) throws IOException,
InterruptedException {
tokenTest();
}
private static void arrayTest() {
ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(
10);
tokenQueue.offer(1);
tokenQueue.offer(1);
tokenQueue.offer(1);
System.out.println(tokenQueue.size());
System.out.println(tokenQueue.remainingCapacity());
}
private static void tokenTest() throws InterruptedException, IOException {
TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512)
.maxFlowRate(1024).build();
BufferedWriter bufferedWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream("D:/ds_test")));
String data = "xxxx";// 四个字节
for (int i = 1; i <= 1000; i++) {
Random random = new Random();
int i1 = random.nextInt(100);
boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data,
i1).getBytes());
TimeUnit.MILLISECONDS.sleep(100);
if (tokens) {
bufferedWriter.write("token pass --- index:" + i1);
System.out.println("token pass --- index:" + i1);
} else {
bufferedWriter.write("token rejuect --- index" + i1);
System.out.println("token rejuect --- index" + i1);
}
bufferedWriter.newLine();
bufferedWriter.flush();
}
bufferedWriter.close();
}
}
限流某个接口的时间窗请求数
即一个时间窗口内的请求数,如想限制某个接口/服务每秒/每分钟/每天的请求数/调用量。如一些基础服务会被很多其他系统调用,比如商品详情页服务会调用基础商品服务调用,但是怕因为更新量比较大将基础服务打挂,这时我们要对每秒/每分钟的调用量进行限速;一种实现方式如下所示:
LoadingCache<Long, AtomicLong> counter = CacheBuilder.newBuilder()
.expireAfterWrite(2, TimeUnit.SECONDS)
.build(new CacheLoader<Long, AtomicLong>() {
@Override
public AtomicLong load(Long seconds) throws Exception {
return new AtomicLong(0);
}
});
long limit = 1000;
while (true) {
// 得到当前秒
long currentSeconds = System.currentTimeMillis() / 1000;
if (counter.get(currentSeconds).incrementAndGet() > limit) {
System.out.println("限流了:" + currentSeconds);
continue;
}
// 业务处理
System.out.println("业务处理");
}
我们使用Guava的Cache来存储计数器,过期时间设置为2秒(保证1秒内的计数器是有的),然后我们获取当前时间戳然后取秒数来作为KEY进行计数统计和限流,这种方式也是简单粗暴,刚才说的场景够用了。
配置的存储
- Apollo
- MySQL
- 代码
做个功能开关,开关放在Apollo中,配置放在MySQL
DEMO
测试代码:
package com.riddle.RateLimiter;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.concurrent.TimeUnit;
/**
* @author w00585603
*/
public class RequestLimiter {
/**
* 每秒发送请求限制
*/
private static final String REQUEST_LIMIT_PER_SECONDS = "2";
private static RateLimiter rateLimiter;
public static void init(String limit) {
int requestLimit = NumberUtils.toInt(limit, 5);
rateLimiter = RateLimiter.create(requestLimit);
System.out.println("REQUEST LIMIT PER SECONDS: " + rateLimiter.getRate());
}
/**
* 模拟外部接口调用
*/
public static String job(int req) throws Exception {
int rsp = req * 2 + req;
Thread.sleep(100L);
return String.valueOf(rsp);
}
public static void main(String[] args) throws Exception {
init(REQUEST_LIMIT_PER_SECONDS);
Thread.sleep(1000L);
for (int i = 0; i < 50; ++i) {
// rateLimiter.acquire();
// System.out.println("req: " + i + " ,rsp is: " + job(i));
if (!rateLimiter.tryAcquire(100L, TimeUnit.MILLISECONDS)) {
System.out.println("req: " + i + " ,rsp is: null");
} else {
System.out.println("req: " + i + " ,rsp is: " + job(i));
}
Thread.sleep(200L);
}
}
}
输出:
REQUEST LIMIT PER SECONDS: 2.0
req: 0 ,rsp is: 0
req: 1 ,rsp is: 3
req: 2 ,rsp is: 6
req: 3 ,rsp is: 9
req: 4 ,rsp is: 12
req: 5 ,rsp is: 15
req: 6 ,rsp is: 18
req: 7 ,rsp is: null
req: 8 ,rsp is: 24
req: 9 ,rsp is: null
req: 10 ,rsp is: 30
req: 11 ,rsp is: null
req: 12 ,rsp is: 36
req: 13 ,rsp is: 39
req: 14 ,rsp is: null
req: 15 ,rsp is: 45
req: 16 ,rsp is: null
req: 17 ,rsp is: 51
req: 18 ,rsp is: null
req: 19 ,rsp is: 57
req: 20 ,rsp is: 60
req: 21 ,rsp is: null
req: 22 ,rsp is: 66
req: 23 ,rsp is: null
req: 24 ,rsp is: 72
req: 25 ,rsp is: null
req: 26 ,rsp is: 78
req: 27 ,rsp is: 81
req: 28 ,rsp is: null
req: 29 ,rsp is: 87
req: 30 ,rsp is: null
req: 31 ,rsp is: 93
req: 32 ,rsp is: null
req: 33 ,rsp is: 99
req: 34 ,rsp is: 102
req: 35 ,rsp is: null
req: 36 ,rsp is: 108
req: 37 ,rsp is: null
req: 38 ,rsp is: 114
req: 39 ,rsp is: null
req: 40 ,rsp is: 120
req: 41 ,rsp is: 123
req: 42 ,rsp is: null
req: 43 ,rsp is: 129
req: 44 ,rsp is: null
req: 45 ,rsp is: 135
req: 46 ,rsp is: null
req: 47 ,rsp is: 141
req: 48 ,rsp is: 144
req: 49 ,rsp is: null
进程已结束,退出代码为 0