Dubbo 并发控制-阿里云

云计算百科 xtyly 2个月前 (03-29) 18次浏览 已收录 0个评论

消费端并发控制

<dubbo:reference id="userService" interface="com.test.UserServiceBo"         group="dubbo" version="1.0.0" timeout="3000" actives="10"/>
  • 在服务消费方设置接口中每个方法并发请求个数,通过设置actives参数。
<dubbo:reference id="userService" interface="com.test.UserServiceBo"         group="dubbo" version="1.0.0" timeout="3000">                 <dubbo:method name="sayHello" actives="10" /> </dubbo:reference>
  • 在服务消费方设置接口中的某个方法的并发请求个数,通过设置actives参数。

阿里云最新优惠1折抢购,2核4G云服务器仅799元/3年,新老用户同享,立即抢购>>>

服务端并发控制

<dubbo:service interface="com.test.UserServiceBo" ref="userService"             group="dubbo"  version="1.0.0" timeout="3000" executes="10"/>
  • 在服务提供方设置接口中每个方法的并发请求数,通过设置executes参数。
<dubbo:service interface="com.test.UserServiceBo" ref="userService"             group="dubbo" version="1.0.0" timeout="3000" >             <dubbo:method name="sayHello" executes="10" /> </dubbo:service>
  • 在服务提供方设置接口中某个方法的并发请求数,通过设置executes参数。

消费端并发限制 – Ac[db:SY_tag]iveLimitFilter

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY) public class ActiveLimitFilter implements Filter {      @Override     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {         URL url = invoker.getUrl();         String methodName = invocation.getMethodName();         int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);         // 获取方法级别的并发限制的RpcStatus对象         RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());         if (max > 0) {             long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);             long start = System.currentTimeMillis();             long remain = timeout;             int active = count.getActive();             // 通过synchronized和wait实现客户端并发限制超过时候需要等待直至超时。             if (active >= max) {                 synchronized (count) {                     while ((active = count.getActive()) >= max) {                         try {                             count.wait(remain);                         } catch (InterruptedException e) {                         }                         long elapsed = System.currentTimeMillis() - start;                         remain = timeout - elapsed;                         if (remain <= 0) {                             throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "                                     + invoker.getInterface().getName() + ", method: "                                     + invocation.getMethodName() + ", elapsed: " + elapsed                                     + ", timeout: " + timeout + ". concurrent invokes: " + active                                     + ". max concurrent invoke limit: " + max);                         }                     }                 }             }         }          try {             long begin = System.currentTimeMillis();             // 累加方法级别的并发数             RpcStatus.beginCount(url, methodName);             try {                 // 执行方法调用                 Result result = invoker.invoke(invocation);                 // 递减方法级别的并发数                 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);                  return result;             } catch (RuntimeException t) {                 // 递减方法级别的并发数                 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);                 throw t;             }         } finally {             // 针对有并发限制的通过notify进行唤醒             if (max > 0) {                 synchronized (count) {                     count.notify();                 }             }         }     }  }
  • 1、首先会去获得服务消费端每服务每方法最大可并行执行请求数。
  • 2、如果方法设置并发请求数就需要判断是否超并发数,超过并发数就等待直至超时。
  • 3、按照累加并发数、执行方法、递减并发数,最后进行唤醒炒作。
  • 4、 消费端设置actives时候会等待直至超时。

服务端并发限制 – Execu[db:SY_tag]eLimitFilter

@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY) public class ExecuteLimitFilter implements Filter {      @Override     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {         URL url = invoker.getUrl();         String methodName = invocation.getMethodName();         Semaphore executesLimit = null;         boolean acquireResult = false;         int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);         if (max > 0) {             RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); //            if (count.getActive() >= max) {             /**              * http://manzhizhen.iteye.com/blog/2386408              * use semaphore for concurrency control (to limit thread number)              */             executesLimit = count.getSemaphore(max);             // 服务提供方设置并发数量后,如果同时请求数量大于了设置的executes的值,则会抛出异常。             if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {                 throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes="" + max + "" /> limited.");             }         }         long begin = System.currentTimeMillis();         boolean isSuccess = true;         RpcStatus.beginCount(url, methodName);         try {             Result result = invoker.invoke(invocation);             return result;         } catch (Throwable t) {             isSuccess = false;             if (t instanceof RuntimeException) {                 throw (RuntimeException) t;             } else {                 throw new RpcException("unexpected exception when ExecuteLimitFilter", t);             }         } finally {             RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);             // 如果需要获取过信号量就进行释放动作。             if(acquireResult) {                 executesLimit.release();             }         }     } }
  • 1、首先会去获得服务提供者每服务每方法最大可并行执行请求数。
  • 2、如果每服务每方法最大可并行执行请求数大于零,那么就基于基于服务 URL + 方法维度获取一个RpcStatus实例。
  • 3、通过RpcStatus实例获取一个信号量,若果获取的这个信号量调用tryAcquire返回false,则抛出异常。
  • 4、如果没有抛异常,那么久调用RpcStatus静态方法beginCount,给这个 URL + 方法维度开始计数。
  • 5、调用服务。
  • 6、调用结束后计数调用RpcStatus静态方法endCount,计数结束。
  • 7、释放信号量。
  • 8、需要注意的是,服务提供方设置并发数量后,如果同时请求数量大于了设置的executes的值,则会抛出异常。

并发限制实现核心 – Rpc[db:SY_tag]tatus

public class RpcStatus {     // service级别的并发限制全局变量     private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();     // method级别的并发限制全局变量     private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();     private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();     // 记录活跃的记录     private final AtomicInteger active = new AtomicInteger();     private final AtomicLong total = new AtomicLong();     private final AtomicInteger failed = new AtomicInteger();     private final AtomicLong totalElapsed = new AtomicLong();     private final AtomicLong failedElapsed = new AtomicLong();     private final AtomicLong maxElapsed = new AtomicLong();     private final AtomicLong failedMaxElapsed = new AtomicLong();     private final AtomicLong succeededMaxElapsed = new AtomicLong();     // 用于记录服务提供端的限制     private volatile Semaphore executesLimit;     private volatile int executesPermits;      private RpcStatus() {     }      // 获取service级别的并发限制变量     public static RpcStatus getStatus(URL url) {         String uri = url.toIdentityString();         RpcStatus status = SERVICE_STATISTICS.get(uri);         if (status == null) {             SERVICE_STATISTICS.putIfAbsent(uri, new RpcStatus());             status = SERVICE_STATISTICS.get(uri);         }         return status;     }      // 获取method级别的并发限制变量,根据service=>method顺序查找     public static RpcStatus getStatus(URL url, String methodName) {         String uri = url.toIdentityString();         ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);         if (map == null) {             METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());             map = METHOD_STATISTICS.get(uri);         }         RpcStatus status = map.get(methodName);         if (status == null) {             map.putIfAbsent(methodName, new RpcStatus());             status = map.get(methodName);         }         return status;     }      // 累加service 和 method的并发限制变量     public static void beginCount(URL url, String methodName) {         beginCount(getStatus(url));         beginCount(getStatus(url, methodName));     }     // 并发限制变量的原子累加     private static void beginCount(RpcStatus status) {         status.active.incrementAndGet();     }     // 递减service 和 method的并发限制变量     public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {         endCount(getStatus(url), elapsed, succeeded);         endCount(getStatus(url, methodName), elapsed, succeeded);     }      private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {         status.active.decrementAndGet();         status.total.incrementAndGet();         status.totalElapsed.addAndGet(elapsed);         if (status.maxElapsed.get() < elapsed) {             status.maxElapsed.set(elapsed);         }         if (succeeded) {             if (status.succeededMaxElapsed.get() < elapsed) {                 status.succeededMaxElapsed.set(elapsed);             }         } else {             status.failed.incrementAndGet();             status.failedElapsed.addAndGet(elapsed);             if (status.failedMaxElapsed.get() < elapsed) {                 status.failedMaxElapsed.set(elapsed);             }         }     }      public Semaphore getSemaphore(int maxThreadNum) {         if(maxThreadNum <= 0) {             return null;         }          if (executesLimit == null || executesPermits != maxThreadNum) {             synchronized (this) {                 if (executesLimit == null || executesPermits != maxThreadNum) {                     executesLimit = new Semaphore(maxThreadNum);                     executesPermits = maxThreadNum;                 }             }         }          return executesLimit;     } }
  • service级别的并发限制全局变量 private static final ConcurrentMap SERVICE_STATISTICS。
  • method级别的并发限制全局变量 private static final ConcurrentMap> METHOD_STATISTICS。
  • 记录活跃的记录 private final AtomicInteger active。

阿里云百科网 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:Dubbo 并发控制-阿里云
喜欢 (0)
[[email protected]]
分享 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址