菜单开关

周梦康 发表于 2021-12-05 132 次浏览 标签 : SpringBoot

项目完整的代码地址 项目完整代码见:https://gitee.com/zhoumengkang/wechat-demo/tree/master/strace02

场景升级

上节解决了单一线程的 trace id 传递,如果子线程和线程池怎么办呢,还有 rpc 远程调用,怎么玩呢?

我们在做项目中肯定有很多时候希望通过异步的方式来提升接口的响应速度,但是我们在开发的时候很容易忽略 trace 信息的丢失。我们将上节的用户信息的获取拆成两个方法,并且都非常耗时,彼此不相互依赖,所以可以该为并行去处理。

@Service
@Slf4j
public class UserService {

    public Integer getAge(Long id){
        log.info("getAge id:{}",id);

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 18;
    }

    public String getName(Long id){
        log.info("getName id:{}",id);

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "加班写Bug";
    }
}
@GetMapping("/{id}")
public ResponseData<UserDTO> detail(@PathVariable Long id) {
    Preconditions.checkNotNull(id, "id is null");

    UserDTO userDTO = new UserDTO();
    userDTO.setId(id);

    CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> userService.getAge(id));

    userDTO.setUsername(userService.getName(id));

    try {
        userDTO.setAge(ageFuture.get());
    } catch (Exception e) {
        log.error("user service error:{}", e.getMessage(), e);
    }

    return ResponseData.success(userDTO);
}

访问 http://localhost:8080/user/1001 查看日志,发现线程池打印的时候没有 trace id

2021-12-05 10:57:47.511  INFO 58592 --- [http-nio-8080-exec-1] [a71e8fad-0284-4d66-97fe-bb06d2dbc37c] com.example.demo.aop.ControllerHandler   : UserController#detail args:{"id":1001}
2021-12-05 10:57:47.583  INFO 58592 --- [http-nio-8080-exec-1] [a71e8fad-0284-4d66-97fe-bb06d2dbc37c] com.example.demo.service.UserService     : getName id:1001
2021-12-05 10:57:47.584  INFO 58592 --- [ForkJoinPool.commonPool-worker-1] [] com.example.demo.service.UserService     : getAge id:1001
2021-12-05 10:57:48.600  INFO 58592 --- [http-nio-8080-exec-1] [a71e8fad-0284-4d66-97fe-bb06d2dbc37c] com.example.demo.aop.ControllerHandler   : UserController#detail response:{"code":200,"data":{"age":18,"id":1001,"username":"加班写Bug"},"message":"OK","success":true,"traceId":"a71e8fad-0284-4d66-97fe-bb06d2dbc37c"}

解决方案

方案1 使用 CompletableFutureWrapper

增加一个自定义线程池

@Configuration
public class AsyncExecutorConfiguration {

    @Bean(name = "asyncExecutor1")
    public ExecutorService asyncExecutor1Service() {
        return new ThreadPoolExecutor(
                10,
                100,
                60L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(128),
                new ThreadFactoryBuilder().setNameFormat("async-executor-1-pool-%d").build(),
                new ThreadPoolExecutor.CallerRunsPolicy());
    }
}

CompletableFuture 进行一次包装,将 MDC 的上下文传递到子线程中去。

@Component
public class CompletableFutureWrapper {

    @Autowired
    @Qualifier("asyncExecutor1")
    private ExecutorService asyncExecutorService;

    public interface Task<U> {
        U callback();
    }

    public <U> CompletableFuture<U> supplyAsync(Task<U> task) {

        Map<String, String> contextMap = MDC.getCopyOfContextMap();

        return CompletableFuture.supplyAsync(() -> {
            try {
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }

                return task.callback();
            } finally {
                MDC.clear();
            }
        }, asyncRpcExecutorService);
    }
}
CompletableFuture<Integer> ageFuture = completableFutureWrapper.supplyAsync(() -> userService.getAge(id));

重启然后再观察日志

2021-12-12 21:02:25.252  INFO 29358 --- [http-nio-8080-exec-2] [6f4c80ca-daf4-4a1d-93c6-21bd3212f215] com.example.demo.aop.ControllerHandler   : UserController#detail args:{"id":1001}
2021-12-12 21:02:25.260  INFO 29358 --- [http-nio-8080-exec-2] [6f4c80ca-daf4-4a1d-93c6-21bd3212f215] com.example.demo.service.UserService     : getName id:1001
2021-12-12 21:02:25.260  INFO 29358 --- [async-executor-1-pool-0] [6f4c80ca-daf4-4a1d-93c6-21bd3212f215] com.example.demo.service.UserService     : getAge id:1001
2021-12-12 21:02:26.277  INFO 29358 --- [http-nio-8080-exec-2] [6f4c80ca-daf4-4a1d-93c6-21bd3212f215] com.example.demo.aop.ControllerHandler   : UserController#detail response:{"code":200,"data":{"age":18,"id":1001,"sex":null,"username":"加班写Bug"},"message":"OK","success":true,"traceId":"6f4c80ca-daf4-4a1d-93c6-21bd3212f215"}

方案2 使用 ThreadPoolExecutorMdcWrapper

在 AsyncExecutorConfiguration 中再增加一个自定义线程池

@Bean(name ="asyncExecutor2")
public ExecutorService asyncExecutor2Service() {
    return new ThreadPoolExecutorMdcWrapper(
            10,
            100,
            60,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(128),
            new ThreadFactoryBuilder().setNameFormat("async-executor-2-pool-%d").build(),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
}

然后继承 ThreadPoolExecutor

public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {

    public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                                        RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    private <T> Callable<T> wrap(final Callable<T> callable) {
        Map<String, String> context = MDC.getCopyOfContextMap();

        return () -> {
            if (context != null) {
                MDC.setContextMap(context);
            }

            try {
                return callable.call();
            } finally {
                MDC.clear();
            }
        };
    }

    private Runnable wrap(final Runnable runnable) {
        Map<String, String> context = MDC.getCopyOfContextMap();

        return () -> {
            if (context != null) {
                MDC.setContextMap(context);
            }

            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return super.submit(wrap(task), result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(wrap(task));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task));
    }
}

使用上,就可以

...

@Autowired
@Qualifier("asyncExecutor2")
ExecutorService executorService;

...

Future<Byte> sexFuture = executorService.submit(()-> studentService.getSex(id));

方案3 ThreadPoolTaskExecutor 配置适配器 MdcTaskDecorator

在 AsyncExecutorConfiguration 中再增加一个 ThreadPoolTaskExecutor 线程池

@Bean(name ="taskExecutor")
public Executor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(20);
    executor.setQueueCapacity(128);
    executor.setThreadNamePrefix("task-executor-pool-");
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setTaskDecorator(new MdcTaskDecorator());
    executor.initialize();

    return executor;
}

然后配置适配器 MdcTaskDecorator

public class MdcTaskDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable runnable) {

        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return () -> {
            try {
                if (contextMap != null) {
                    MDC.setContextMap(contextMap);
                }
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }
}
@Service
@Slf4j
public class StudentService {

    @Async("taskExecutor")
    public Future<Integer> getAge(Long id){
        log.info("getAge id:{}",id);

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return AsyncResult.forValue(18);
    }
}

使用方式

Future<Integer> ageFuture = studentService.getAge(id);

但是很多时候某个方法,有时候需要同步调用,有时候是异步调用,个人觉得还是方案 1 和 方案 2 更灵活点。

监控自定义线程池

记得对我们增加的线程池进行监控,当出现线程池排队,导致服务性能降低的时候,即使对线程池进行调整。以方案 1 中的线程池名为例

$ jstack 20275|grep async-executor-1-pool
"async-executor-1-pool-0" #36 prio=5 os_prio=31 tid=0x00007fe92badd800 nid=0x520f waiting on condition [0x00007000110d9000]
...
$ jstack 20275|grep async-executor-1-pool|grep waiting|wc -l
100
$ jstack 20275|grep async-executor-1-pool|grep runnable|wc -l
0

其他

除了线程上 trace 信息的传递,还要 http 、自定义 rpc 协议之间的传递都需要透传 trace 信息,比如 http 就可以设置在 header 信息里,rpc 也可以专门在 rpc head 协议中设置 trace 字段。设置还可以在 sql 、redis 执行上都可以记录下。

👇 下面是我的公众号,高质量的博文我会第一时间同步到公众号,给个关注吧!

评论列表