异步工具封装

CompletableFuture封装异步工具

注:  其中的EagleAsyncRunnable和EagleEyeFutureTask需要自己定义,分别继承Runnable和FutureTask,方便实现在异步线程内要做的特殊动作,如在上下文context中放入分布式trarceId,便于排查异步问题

public class AsyncUtils {

    /**
     * 根据参数生成一个线程池
     *
     * @param coreThreadNum    核心线程数
     * @param maxThreadNum     最大线程数
     * @param aliveTime        线程池存活时间 - 单位秒
     * @param workQueue        工作队列 - 存储线程需要执行的任务
     * @param threadNamePrefix 自定义线程的名字前缀
     * @return 自定义线程池
     */
    public static ThreadPoolExecutor makeThreadPoolExecutor(int coreThreadNum,
                                                            int maxThreadNum,
                                                            int aliveTime,
                                                            BlockingQueue<Runnable> workQueue,
                                                            String threadNamePrefix) {

        return new ThreadPoolExecutor(coreThreadNum, maxThreadNum, aliveTime, TimeUnit.SECONDS, workQueue,
                                      new DefaultThreadFactory(threadNamePrefix, true),
                                      new ThreadPoolExecutor.CallerRunsPolicy()) {
            @Override
            public @NotNull Future<?> submit(@NotNull Runnable task) {
                return super.submit(new EagleAsyncRunnable(task));
            }

            @Override
            public void execute(@NotNull Runnable runnable) {
                super.execute(new EagleAsyncRunnable(runnable));
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                //do something
            }

            @Override
            protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
                return new EagleEyeFutureTask<>(runnable, value);
            }

            @Override
            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                return new EagleEyeFutureTask<>(callable);
            }
        };
    }

    /**
     * 线程池
     */
    private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = makeThreadPoolExecutor(AsyncSwitch.CORE_THREAD_NUM,
                                                                                          AsyncSwitch.MAX_THREAD_NUM, AsyncSwitch.KEEP_ALIVE_TIME_IN_SECOND,
                                                                                          new SynchronousQueue<>(false), "MarketingGuideAsync");

    /**
     * 异步执行- 带有返回值 CompleteFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {

        return CompletableFuture.supplyAsync(supplier, THREAD_POOL_EXECUTOR);
    }

    /**
     * 异步执行- 不带有返回值 CompleteFuture
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable) {

        return CompletableFuture.runAsync(runnable, THREAD_POOL_EXECUTOR);
    }

    public static <U> List<U> supplyAllAsync(List<Supplier<U>> supplierList) {
        if (CollectionUtils.isEmpty(supplierList)) {
            return Collections.emptyList();
        }

        List<CompletableFuture<U>> futureList = supplierList.stream()
            .map(AsyncUtils::supplyAsync)
            .collect(Collectors.toList());
        try {
            CompletableFuture<Void> allOf = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
            allOf.get();
        } catch (Exception e) {
            log.error("waitAllAndGetResult error.{}", e.getMessage(), e);
        }

        return futureList.stream()
            .map(CompletableFuture::join)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
    }
}

ThreadPool封装异步工具

public class ThreadPoolUtils {

    private static final ThreadPoolExecutor MONTH_CARD_EXECUTOR = new ThreadPoolExecutor(
        50,
        150,
        1000L,
        TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(2000),
        new ThreadFactoryBuilder().setNameFormat("MonthCardPool-%d").build(),
        new ThreadPoolExecutor.CallerRunsPolicy());

    public static <T> Future<T> submit(Callable<T> callable) {
        return MONTH_CARD_EXECUTOR.submit(new EagleEyeCallable<>(callable));
    }

    public static <T> List<Future<T>> submitAll(List<Callable<T>> callables) {
        List<EagleEyeCallable<T>> list = callables.stream().map(EagleEyeCallable::new).collect(Collectors.toList());
        try {
            return MONTH_CARD_EXECUTOR.invokeAll(list);
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }
}
end

评论