并发:
- Guava集合处理是很强大的(这些在jdk8中都有些引入),但Guava发光的地方是并发。
Monitor
- Monitor实现同步
/**
* 通过Monitor的Guard进行条件阻塞
*/
public class MonitorSample {
private List<String> list = new ArrayList<String>();
private static final int MAX_SIZE = 10;
private Monitor monitor = new Monitor();
private Monitor.Guard listBelowCapacity = new Monitor.Guard(monitor) {
@Override
public boolean isSatisfied() {
return list.size() < MAX_SIZE;
}
};
public void addToList(String item) throws InterruptedException {
monitor.enterWhen(listBelowCapacity); //Guard(形如Condition),不满足则阻塞,而且我们并没有在Guard进行任何通知操作
try {
list.add(item);
} finally {
monitor.leave();
}
}
}
- Monitor就像java本土的synchronized, ReentrantLock一样,每次只运行一个线程占用,且可重占用,每一次占用会对应一次退出占用。
Monitor最佳实践:
- 就如上面,我们通过if条件来判断是否可进入Monitor代码块,并再try/finally中释放:
if (monitor.enterIf(guardCondition)) {
try {
doWork();
} finally {
monitor.leave();
}
}
其他的Monitor访问方法:
Monitor.enter //进入Monitor块,将阻塞其他线程知道Monitor.leave
Monitor.tryEnter //尝试进入Monitor块,true表示可以进入, false表示不能,并且不会一直阻塞
Monitor.tryEnterIf //根据条件尝试进入Monitor块
这些方法都有对应的限时版本。
ListenableFuture类
- jdk5之后有了Future这种异步执行的结构
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(new Callable<Integer>(){
public Integer call() throws Exception{
return service.getCount();
} });
//Retrieve the value of computation
Integer count = future.get();
- ListenableFuture对Future进行了扩展,允许注册一个回调函数,task执行完后自动调用。
- 获取ListableFuture对象。
正如我们获取Future对象要通过ExecutorService.submit(Callable)来获取一样,我们可以这样创建ListenableFuture对象:
executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUM_THREADS)); //包装Executors创建的线程池
ListenableFuture<String> listenableFuture = executorService.submit(new Callable<String>()...); //获取ListableFuture对象
listenableFuture.addListener(new Runnable() {
@Override
public void run() {
methodToRunOnFutureTaskCompletion();
}
}, executorService); //注册回调函数
FutureCallback类
- FutureCallback定义了onSuccess和onFailure方法,onSuccess方法会接收一个Future对象,这样我们就可以获取Future的结果。
- 首先需要一个FutureCallback实现类。
/**
* 定义一个FutureCallBack实现类
*/
public class FutureCallbackImpl implements FutureCallback<String> {
private StringBuilder builder = new StringBuilder();
@Override
public void onSuccess(String result) {
builder.append(result).append(" successfully");
}
@Override
public void onFailure(Throwable t) {
builder.append(t.toString());
}
public String getCallbackResult() {
return builder.toString();
}
}
使用实例:
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> futureTask = executorService.submit(new Callable<String>() { //创建ListenaleFuture对象
@Override
public String call() throws Exception {
return "Task completed";
}
});
FutureCallbackImpl callback = new FutureCallbackImpl();
Futures.addCallback(futureTask, callback); //添加回调
callback.getCallbackResult(); //获取结果
如果CallBack是一个耗时操作,你应该选择另一个注册CallBack:
Futures.addCallback(futureTask,callback,executorService); //提供另一个线程池来执行性回调
SettableFuture类:
SettableFuture可以用来设置要返回得值:
SettableFuture<String> sf = SettableFuture.create();
//Set a value to return
sf.set("Success");
//Or set a failure Exception
sf.setException(someException);
AsyncFunction:
- 该接口与函数式编程密切相关, 类似Function, 但apply方法会转换成一个ListenableFuture封装的范型对象。
public class AsyncFuntionSample implements AsyncFunction<Long, String> {
private ConcurrentMap<Long, String> map = Maps.newConcurrentMap();
private ListeningExecutorService listeningExecutorService;
@Override
public ListenableFuture<String> apply(final Long input) throws Exception {
if (map.containsKey(input)) {
SettableFuture<String> listenableFuture = SettableFuture.create(); //构建一个SettableFuture
listenableFuture.set(map.get(input));
return listenableFuture;
} else {
return listeningExecutorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
String retrieved = //compute to get the data;
map.putIfAbsent(input, retrieved);
return retrieved;
}
});
}
}
}
FutureFallback类:
- FutureFallback用于异常恢复的备份。
/**
* 当Future任务失败后, 作为备份的Future
*/
public class FutureFallbackImpl implements FutureFallback<String> {
@Override
public ListenableFuture<String> create(Throwable t) throws Exception {
if (t instanceof FileNotFoundException) {
SettableFuture<String> settableFuture = SettableFuture.create();
settableFuture.set("Not Found");
return settableFuture;
}
throw new Exception(t);
}
}
Futures类:
- Futures类是有关Future实例的一个工具类。
异步转换:
ListenableFuture<Person> lf = Futures.transform(ListenableFuture<String> f,AsyncFunction<String,Person> af);
使用FutureFallbacks:
ListenableFuture<String> lf = Futures.withFallback(ListenableFuture<String> f,FutureFallback<String> fb);
RateLimiter:
- RateLimiter限制访问每秒访问资源的线程数。有点类似信号量Semaphore。
RateLimiter limiter = RateLimiter.create(4.0); //每秒不超过4个任务被提交
limiter.acquire(); //请求RateLimiter, 超过permits会被阻塞
executor.submit(runnable); //提交任务
也有非阻塞式地尝试:
If(limiter.tryAcquire()){ //未请求到limiter则立即返回false
doSomething();
}else{
doSomethingElse();
}
来源:http://my.oschina.net/indestiny/blog/219368
版权声明
本站文章、图片、视频等(除转载外),均采用知识共享署名 4.0 国际许可协议(CC BY-NC-SA 4.0),转载请注明出处、非商业性使用、并且以相同协议共享。
© 空空博客,本文链接:https://www.yeetrack.com/?p=1175
近期评论