CompletableFuture封装异步工具
注: 其中的EagleAsyncRunnable和EagleEyeFutureTask需要自己定义,分别继承Runnable和FutureTask,方便实现在异步线程内要做的特殊动作,如在上下文context中放入分布式trarceId,便于排查异步问题
public class AsyncUtils {
/**
* 根据参数生成一个线程池
*
* @param coreThreadNum 核心线程数
* @param maxThreadNum 最大线程数
* @param aliveTime 线程池存活时间 - 单位秒
* @param workQueue 工作队列 - 存储线程需要执行的任务
* @param threadNamePrefix 自定义线程的名字前缀
* @return 自定义线程池
*/
public static ThreadPoolExecutor makeThreadPoolExecutor(int coreThreadNum,
int maxThreadNum,
int aliveTime,
BlockingQueue<Runnable> workQueue,
String threadNamePrefix) {
return new ThreadPoolExecutor(coreThreadNum, maxThreadNum, aliveTime, TimeUnit.SECONDS, workQueue,
new DefaultThreadFactory(threadNamePrefix, true),
new ThreadPoolExecutor.CallerRunsPolicy()) {
@Override
public @NotNull Future<?> submit(@NotNull Runnable task) {
return super.submit(new EagleAsyncRunnable(task));
}
@Override
public void execute(@NotNull Runnable runnable) {
super.execute(new EagleAsyncRunnable(runnable));
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//do something
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new EagleEyeFutureTask<>(runnable, value);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new EagleEyeFutureTask<>(callable);
}
};
}
/**
* 线程池
*/
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = makeThreadPoolExecutor(AsyncSwitch.CORE_THREAD_NUM,
AsyncSwitch.MAX_THREAD_NUM, AsyncSwitch.KEEP_ALIVE_TIME_IN_SECOND,
new SynchronousQueue<>(false), "MarketingGuideAsync");
/**
* 异步执行- 带有返回值 CompleteFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return CompletableFuture.supplyAsync(supplier, THREAD_POOL_EXECUTOR);
}
/**
* 异步执行- 不带有返回值 CompleteFuture
*/
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return CompletableFuture.runAsync(runnable, THREAD_POOL_EXECUTOR);
}
public static <U> List<U> supplyAllAsync(List<Supplier<U>> supplierList) {
if (CollectionUtils.isEmpty(supplierList)) {
return Collections.emptyList();
}
List<CompletableFuture<U>> futureList = supplierList.stream()
.map(AsyncUtils::supplyAsync)
.collect(Collectors.toList());
try {
CompletableFuture<Void> allOf = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
allOf.get();
} catch (Exception e) {
log.error("waitAllAndGetResult error.{}", e.getMessage(), e);
}
return futureList.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
ThreadPool封装异步工具
public class ThreadPoolUtils {
private static final ThreadPoolExecutor MONTH_CARD_EXECUTOR = new ThreadPoolExecutor(
50,
150,
1000L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(2000),
new ThreadFactoryBuilder().setNameFormat("MonthCardPool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy());
public static <T> Future<T> submit(Callable<T> callable) {
return MONTH_CARD_EXECUTOR.submit(new EagleEyeCallable<>(callable));
}
public static <T> List<Future<T>> submitAll(List<Callable<T>> callables) {
List<EagleEyeCallable<T>> list = callables.stream().map(EagleEyeCallable::new).collect(Collectors.toList());
try {
return MONTH_CARD_EXECUTOR.invokeAll(list);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
评论