Dubbo RPC调用链追踪接入Jaeger

图片来自pixabay.com的NickRivers会员

1. 介绍

微服务架构中,分布式追踪(distributed tracing)是一个关键的基础功能,通过分布式追踪技术,我们可以深入分析一次请求调用所执行的路径、性能消耗,帮助定位性能瓶颈点,透明化服务之间上下游网络调用关系,帮助优化服务层次依赖问题。

Dubbo RPC是业界常用的一个开源RPC框架,而Jaeger也是业界流行的一个开源分布式追踪组件,本文将介绍如何把Dubbo RPC调用链接入Jaeger追踪,文末将结合分布式追踪技术,对常见的问题案例进行分析和给出优化方案。

2. 追踪数据模型和dubbo埋点实现原理

Jaeger的追踪实现遵循了OpenTracing语义规范,其数据模型是一个trace包含多个span构成的有向无环图,如下是一个典型的trace样例,

        [Span A]  ←←←(the root span)
            |
     +------+------+
     |             |
 [Span B]      [Span C] ←←←(Span C is a `ChildOf` Span A)
     |             |
 [Span D]      +---+-------+
               |           |
           [Span E]    [Span F] >>> [Span G] >>> [Span H]
                                       ↑
                                       ↑
                                       ↑
                         (Span G `FollowsFrom` Span F)

更多OpenTracing术语的定义和详细介绍请见这里,本文将不赘述。

若要将Dubbo RPC接入Jaeger追踪,从consumer到provider一次调用为最基本的追踪单元,整个dubbo的调用追踪可以视为该基本单元的规模扩展,因此该基本单元的追踪埋点实现是Dubbo RPC接入Jaeger追踪的核心。

下图为对这个基本追踪单元的埋点实现描述,

3. 代码实现

下面为简化版的RpcConsumerFilter实现代码,

@Activate(group = {Constants.CONSUMER})
public class RpcConsumerFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 获取context并创建span
        RpcContext rpcContext = RpcContext.getContext();
        Span span = DubboTraceUtil.extractTraceFromLocalCtx(rpcContext);

        Result result = null;    
        try {
            // 将span context加载到dubbo rpc remote context中
            DubboTraceUtil.attachTraceToRemoteCtx(span, rpcContext);

            // 执行dubbo rpc调用
            result = invoker.invoke(invocation);
        } catch (RpcException rpcException) {
            span.setTag("error", "1");
            throw rpcException;
        } finally {
            span.finish();
        }

        return result;
    }
}

下面为简化版的RpcProviderFilter实现代码,


@Activate(group = {Constants.PROVIDER}) public class RpcProviderFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { // 获取context并创建span RpcContext rpcContext = RpcContext.getContext(); Span span = DubboTraceUtil.extractTraceFromRemoteCtx(rpcContext); Result result = null; try { // 将span context加载到dubbo rpc local context中 DubboTraceUtil.attachTraceToLocalCtx(span, rpcContext); // 执行dubbo rpc调用 result = invoker.invoke(invocation); } catch (RpcException rpcException) { span.setTag("error", "1"); throw rpcException; } finally { span.finish(); } return result; }

完整的代码演示样例请见这里

4. 埋点上报的span数据格式

数据字段 字段类型 是否必要字段 说明
traceId 字符串 Y 当前span所属的一次调用跟踪ID
spanID 字符串 Y 当前span的ID
parentSpanID 字符串 Y 父spanID,串联上下游span
startTime 长整型 Y 当前span的开始时间
duration 长整型 Y 当前span的时长
tags.span.kind 字符串 Y 当前调用方类型:server/client
tags.sampler.type 字符串 抽样器类型
tags.sampler.param 字符串 抽样比例,取值范围:0-1
tags.peer.hostname 字符串 对调方的主机名/IP
tags.peer.port 短整型 对调方的端口
operationName 字符串 Y dubbo接口名
tags.arguments 字符串 Y dubbo接口调用的参数
tags.error 字符串 Y* 当前dubbo调用出现异常或错误,值为“1”,注:该字段只有在有错误异常情况下为必要字段
tags.error.code 字符串 dubbo的异常码
tags.error.message 字符串 dubbo的异常消息

如下分别为来自dubbo consumer/provider的span上报数据样例,

// 消费方span
{
    "traceID": "5eb2e61e850be731",
    "spanID": "5eb2e61e850be731",
    "parentSpanID": "0",
    "startTime": 1592835612981000,
    "duration": 9781,
    "operationName": "com.pphh.demo.common.service.SimpleService:save",
    "tags.span.kind": "client",
    "tags.sampler.type": "probabilistic",
    "tags.peer.service": "com.pphh.demo.common.service.SimpleService",
    "tags.sampler.param": "1.0",
    "tags.arguments": "[{\"userName\":\"michael\"}]",
    "tags.peer.hostname": "192.168.1.105",
    "tags.peer.port": "29001"
}

// 提供方span
{
    "traceID": "5eb2e61e850be731",
    "spanID": "c7aaec2ab0a20823",
    "parentSpanID": "5eb2e61e850be731",
    "startTime": 1592835612986000,
    "duration": 1880,
    "operationName": "com.pphh.demo.common.service.SimpleService:save",
    "tags.span.kind": "server",
    "tags.peer.service": "com.pphh.demo.common.service.SimpleService",
    "tags.arguments": "[{\"userName\":\"michael\"}]",
    "tags.peer.hostname": "192.168.1.105",
    "tags.peer.port": "49707"
}

5. Dubbo RPC分布式追踪大图

将所有dubbo rpc调用的上报span数据按应用聚合,可以看到整个Dubbo RPC分布式追踪大图,

6. 通过分布式追踪发现的常见问题案例

6.1 应用服务依赖:多次重复rpc调用/上下依赖倒置

通过分布式追踪大图可以清晰地看到各个应用的调用关系,应避免不必要的重复rpc调用,禁止底层应用调用上层应用。

6.2 大流量调用

分布式追踪大图中,调用线条的粗细描述了各个调用关系的流量大小,

对于大流量调用,需要评估其流量的合理性,减少不必要的RPC调用开销,可以考虑从如下几个方面进行优化,
1. 循环多次调用转为单次批量调用。
2. 若是读操作,接受数据的时延,可以考虑使用local cache,在指定的N秒内,直接读取local cache。
3. 应用服务拆分,隔离因大流量调用而产生的CPU/内存/网络等资源竞争。

6.3 性能问题 - 重复调用转为单次批量调用

该问题典型场景为在一个循环中重复执行RPC调用,其调用性能取决于循环的次数,比如获取100个用户信息,循环100次获取用户信息,这个问题最好的办法是将循环调用转为单次批量调用。

下图为一个重复调用的追踪图,

6.4 异常调用定位 - 非法参数

对于异常调用,可以查看异常信息,并结合调用的参数,定位问题,比如用户名非法的异常,可以查看调用参数,用户名是否包含非法字符。

6.5 网络抖动

这种问题常见的现象是,整个调用链中,上游span耗时非常长,下游span耗时非常短,见下图,

可以看到,上下游都被执行,但是上下游衔接耗时很长,其问题的原因主要出现在上下游衔接。上图中的问题后来定位到网络抖动导致。

7. 参考资料

  1. Jaeger分布式追踪
  2. Jaeger开源代码
  3. OpenTracing语义规范

Spring事务的那些坑

图片来自pixabay.com的distelapparath会员

Spring MVC框架对数据访问事务抽象封装在spring-tx模块中,一个简单@Transactional注解就可以对一个方法调用开启事务,在使用过程中,若对Spring MVC的事务配置和原理了解不够清楚,一不小心就会落入坑中。

注:本文的代码技术讨论基于Spring Framework Tx 5.2.6.RELEASE版本 + Spring Boot 2.2.7.RELEASE版本。

1. 坑1 - 事务方法调用过程中的异常抛出

对一个方法调用开启事务,是希望在运行过程中,一旦有未知异常发生,则需要回滚数据库表的相关写操作,保证数据操作的原子性和一致性。

下面的insertUser()方法中,首先对一个user记录进行了保存写入操作,然后有一个异常被抛出,这个时候user记录是否会被回滚?

@Service
public class UserService {

    @Autowired
    UserInfoDAO userInfoDAO;

    @Transactional
    public void insertUser(UserInfoDTO user) throws Exception {
        userInfoDAO.insert(user);
        throw new Exception("Oh, an error happened");
        return user;
    }

}

答案其实是不会,user记录还是被保存到了数据库表中。其原因是@Transactional在未配置任何rollbackFor异常类时,将会执行如下缺省配置,即只对RuntimeException和Error两种异常发生时才执行回滚操作,上面的代码中抛出的异常类是Exception,未命中回滚条件。

public class DefaultTransactionAttribute extends DefaultTransactionDefinition implements TransactionAttribute {

    public boolean rollbackOn(Throwable ex) {
        return ex instanceof RuntimeException || ex instanceof Error;
    }

}

若希望在上面的情况下,回滚user记录的写入操作,一个解决办法是,在标注Transactional时指定如下的回滚异常类,

@Transactional(rollbackFor = Exception.class)

更多的Transactional注解配置说明见下文。

2. 坑2 - 内部调用方法被标注为事务

在下面的代码样例中,UserService提供了两个方法,

  • public void insertUser(UserInfoDTO user)
  • private void insertUserInner(UserInfoDTO user)

前一个方法将会调用后者,后者执行数据记录保存操作,@Transactional注解标注在后者方法上。

@RestController
public class TestController {

    @Autowired
    private UserService userService;

    @RequestMapping(value = "/test", method = RequestMethod.POST)
    public UserInfoDTO addUser(@RequestBody UserInfoDTO user) throws Exception {
        return userService.insertUser(user);
    }

}


@Service
public class UserService {

    @Autowired
    UserInfoDAO userInfoDAO;

    public void insertUser(UserInfoDTO user) throws Exception {
        insertUserInner(user);
    }

    @Transactional
    private void insertUserInner(UserInfoDTO user) throws Exception {
        userInfoDAO.insert(user);
        throw new RuntimeException("Oh, an error happened");
    }

}

若在insertUserInner方法中出现异常,上面的user记录同样也不会被回滚,原因是,
1. Transactional的实现是通过Spring AOP原理,由于Spring AOP对内部方法调用不起作用,数据事务也无法正常启动。
2. @Transactional注解只对public方法起作用,对于private/protected/package-visible的方法上是无效的。

要想对内部调用或私有方法启用数据事务,可以考虑使用可编程的数据事务

3. 坑3 - 在一个事务方法调用了另外一个事务方法

在下面的代码样例中,事务方法UserAService.insertUser()调用了另外一个UserBService.insertUser()事务方法,并通过try/catch对后者的异常进行了日志输出处理。

@Service
public class UserAService {

    @Autowired
    UserBService userBService;

    @Autowired
    UserInfoDAO userInfoDAO;

    @Transactional()
    public void insertUser(UserInfoDTO user) throws Exception {
        // 保存用户记录A
        userInfoDAO.insert(user);

        try {
            userBService.insertUser(user);
        } catch (Throwable e){
            System.out.println(e.toString());
        }
    }

}

@Service
public class UserBService {

    @Autowired
    UserInfoDAO userInfoDAO;

    @Transactional()
    public void insertUser(UserInfoDTO user) throws Exception {
        // 保存用户记录B
        userInfoDAO.insert(user);
        throw new RuntimeException("Oh, an error happened");
    }

}

在UserBService.insertUser()方法执行过程中,若有未知异常发生,用户记录A和用户记录B在数据库表中的状态是怎样?是保存,还是回滚了?

有很多人认为,由于异常被try/catch了,所以用户记录A将会被正常保存,而用户记录B会被回滚,而事实上是,两个用户记录都未被保存到。

查看日志可以看到如下的报错信息,

org.springframework.transaction.UnexpectedRollbackException: Transaction rolled back because it has been marked as rollback-only at 
org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(AbstractPlatformTransactionManager.java:870)at 
org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:707) at 
org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:385) at 
org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) at 
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at 
com.pphh.demo.service.UserAService$$EnhancerBySpringCGLIB$$73a6d5aa.insertUser(<generated>) ~[classes/:na]

这个异常日志说当前事务已回滚,无法执行当前事务。查看整个事务执行过程,就可以了解这个报错的原因,
- UserAService.insertUser被调用,开启了事务A,保存用户记录A
- UserBService.insertUser被调用,由于当前已经有存在事务,根据Transactional方法默认事务传播定义,是需要加入到当前事务A
- UserBService.insertUser异常发生,回滚当前事务A,用户记录B被回滚
- UserAService.insertUser执行完毕,准备提交用户记录A,这时发现当前事务A已经被回滚,无法执行当前事务。

要想解决这个问题,让用户记录A将会被正常保存,而用户记录B被回滚,可以将UserBService.insertUser的事务传播定义为Propagation.NEW,

public class UserBService {

    @Transactional(propagation = Propagation.NEW)
    public void insertUser(UserInfoDTO user) throws Exception {
        // 保存用户记录B
        userInfoDAO.insert(user);
        throw new RuntimeException("Oh, an error happened");
    }

}

4. 开启事务调试日志

有时候为了方便查看事务运行情况,可以通过下方法开启事务日志,

# transaction
logging.level.org.springframework.transaction.interceptor=TRACE

如下是一段Trace执行日志,可以看到由于没有回滚规则定义,直接使用了缺省规则。

2020-05-26 12:16:14.329 o.s.t.i.TransactionInterceptor           : Getting transaction for [com.pphh.demo.service.UserService.insertUser]
2020-05-26 12:16:14.490 o.s.t.i.TransactionInterceptor           : Completing transaction for [com.pphh.demo.service.UserService.insertUser] after exception: java.lang.Exception: Oh, an error happened
2020-05-26 12:16:14.490 o.s.t.i.RuleBasedTransactionAttribute    : Applying rules to determine whether transaction should rollback on java.lang.Exception: Oh, an error happened
2020-05-26 12:16:14.490 o.s.t.i.RuleBasedTransactionAttribute    : Winning rollback rule is: null
2020-05-26 12:16:14.490 o.s.t.i.RuleBasedTransactionAttribute    : No relevant rollback rule found: applying default rules

5. Transational注解配置说明

下面对Transational注解的各个配置定义进行了说明,供参考,

public @interface Transactional {

    // 指定事务管理器
    @AliasFor("transactionManager")
    String value() default "";

    // 同上
    @AliasFor("value")
    String transactionManager() default "";

    // 事务嵌套时,事务的传播定义,有如下几种,
    // Propagation.REQUIRED      -- 当前若有事务存在,则加入该事务。若无事务,则创建一个事务。
    // Propagation.SUPPORTS      -- 当前若有事务存在,则加入该事务。若无事务,则继续非事务状态运行。
    // Propagation.MANDATORY     -- 当前必须在事务中,若当前无事务,则抛出异常。
    // Propagation.REQUIRES_NEW  -- 当前方法新起一个事务。当前若有事务存在,则挂起当前事务。
    // Propagation.NOT_SUPPORTED -- 当前方法不支持事务,若有事务存在,则挂起当前事务。
    // Propagation.NEVER         -- 当前方法不支持事务,若有事务存在,则抛出异常。
    Propagation propagation() default Propagation.REQUIRED;

    // 事务隔离级别,有如下几个定义,该选项只适用于REQUIRED和REQUIRES_NEW事务传播级别
    // DEFAULT          -- 数据库缺省
    // READ_UNCOMMITTED -- 读未提交
    // READ_COMMITTED   -- 读已提交
    // REPEATABLE_READ  -- 可重复读
    // SERIALIZABLE     -- 串行
    Isolation isolation() default Isolation.DEFAULT;

    // 事务超时时间,缺省是数据库指定,该选项只适用于REQUIRED和REQUIRES_NEW事务传播级别
    int timeout() default -1;

    // 只读事务,该选项只适用于REQUIRED和REQUIRES_NEW事务传播级别
    boolean readOnly() default false;

    // 事务回滚异常类
    // 多个回滚条件可以通过如下指定
    // @Transactional(rollbackFor = {Exception.class, Error.class})
    Class<? extends Throwable>[] rollbackFor() default {};

    // 事务回滚异常类,必须继承自Throwable
    String[] rollbackForClassName() default {};

    // 事务回滚异常类过滤条件
    Class<? extends Throwable>[] noRollbackFor() default {};

    // 事务回滚异常类过滤条件,必须继承自Throwable
    String[] noRollbackForClassName() default {};
}

6. 参考资料

通过Mysql分析Redis的内存快照数据

图片来自pixabay.com的mrgajowy3会员

本文描述了如何将Redis的内存快照数据导出,然后导入到Mysql,通过Mysql实现对Redis的内存数据分析,获取键值总数、内存消耗最大键值等信息。

1. 环境版本

本文的演示环境如下,

  • Mysql 5.7(安装在windows)
  • Python 3.6(安装在linux)
  • Redis 3.2.5(安装在linux)

2. 获取Redis内存快照

登录Redis客户端,执行bgsave命令,此命令将当前Redis的内存快照保存为dump.rdb文件

$ ./redis-3.2.5/bin/redis-cli -h localhost
localhost:6379> bgsave
Background saving started

Redis服务端将显示如下日志,

8961:M 01 Apr 21:12:06.419 * Background saving started by pid 9505
9505:C 01 Apr 21:12:06.440 * DB saved on disk
9505:C 01 Apr 21:12:06.441 * RDB: 0 MB of memory used by copy-on-write
8961:M 01 Apr 21:12:06.533 * Background saving terminated with success

生成的dump.rdb文件位于,

  • 缺省文件位置:./redis-3.2.5/bin/dump.rdb
  • 自定义文件位置:见redis.conf文件中的配置项dbfilename dump.rdb,该配置项可以改变dump.rdb文件的保存位置。

3. 通过rdb工具导出csv格式数据

3.1 安装rdb工具

工具rdb是一个Python工具,需要预先安装Python,然后执行如下命令,

# 创建虚拟环境
python -m venv venv

# 激活虚拟环境
# - 在windows系统
venv\Scripts\activate.bat
# - 在linux系统
source venv\Scripts\activate

# 安装依赖
pip install rdbtools
pip install python-lzf

注1:Windows7上安装rdbtools,会出现如下报错信息。根据提示,rdbtools需要安装Microsoft Visual C++ 14.0。

error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/

注2:安装python-lzf,这是由于在下一步解析dump文件时,该工具能够加快解析速度。不安装的话,将会有提示信息:Parsing dump file will be very slow unless you install it。

3.2 导出csv格式数据

在安装rdbtools好了后,执行如下命令,通过rdb工具解析dump文件为csv文件。

rdb -c memory dump.rdb > memory.csv

打开memory.csv文件,可以看到里面内容格式如下,

database,type,key,size_in_bytes,encoding,num_elements,len_largest_element,expiry
0,string,username,72,string,8,8,
0,string,test,48,string,8,8,
......

里面包括了如下的redis键值信息,

  • 数据库
  • 键类型
  • 键名
  • 占用内存空间大小
  • 编码
  • 元素个数
  • 最大元素大小
  • 失效时间

接下来就可以将memory.csv文件导入到Mysql进行下一步的统计分析。

4. 导入csv内存快照数据到Mysql

  1. 打开Mysql客户端,执行如下sql(请预先创建好数据库datareport),创建表memory,
    DROP TABLE IF EXISTS `datareport`.`memory`;
    create table IF NOT EXISTS `datareport`.`memory` (
    `database` int,
    `type` varchar(128),
    `key` varchar(128),
    `size_in_bytes` int,
    `encoding` varchar(128),
    `num_elements` int,
    `len_largest_element` int,
    `expiry` varchar(128),
    KEY `idx_type` (`type`),
    KEY `idx_key` (`key`),
    KEY `idx_size_in_bytes` (`size_in_bytes`),
    KEY `idx_num_elements` (`num_elements`),
    KEY `idx_len_largest_element` (`len_largest_element`)
    );
    
  2. 复制文件memory.csv到C:/ProgramData/MySQL/MySQL Server 5.7/Uploads目录下。
    • 打开复制的memory.csv文件,删除如下第一行表头数据。
    database,type,key,size_in_bytes,encoding,num_elements,len_largest_element,expiry
    
  3. 执行如下sql命令,加载csv数据到表
    LOAD DATA INFILE'C:/ProgramData/MySQL/MySQL Server 5.7/Uploads/memory.csv' replace INTO TABLE `datareport`.`memory` FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' ESCAPED BY '"' LINES TERMINATED BY '\n';
    

    根据数据文件大小和机器配置,该加载过程时间会比较长,150MB的memory.csv文件耗时可能达1个小时。

5. 通过Mysql分析Redis内存数据

数据导入到数据库后,接下来就可以进行一些数据统计分析了,下面给出几个数据统计样例。

  1. 查询key个数
    SELECT COUNT(*) FROM `memory`;
    
  2. 查询总的内存占用
    SELECT SUM(size_in_bytes) FROM `memory`;
    
  3. 查询内存占用最高的前10个key
    SELECT * FROM `memory` ORDER BY size_in_bytes DESC LIMIT 10;
    
  4. 查询成员个数1000个以上的list/hash
    SELECT * FROM `memory` WHERE TYPE='quicklist' AND num_elements > 1000;
    
  5. 查询成员个数最多的前100个hash/set/sortedset
    SELECT * FROM MEMORY WHERE TYPE='hash' ORDER BY num_elements DESC LIMIT 100;
    SELECT * FROM MEMORY WHERE TYPE='set' ORDER BY num_elements DESC LIMIT 100;
    SELECT * FROM MEMORY WHERE TYPE='sortedset' ORDER BY num_elements DESC LIMIT 100;
    
  6. 保存数据到文件中
    SELECT * FROM `memory` ORDER BY size_in_bytes DESC LIMIT 10 INTO OUTFILE 'C:/ProgramData/MySQL/MySQL Server 5.7/Uploads/total.csv' FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '"' ESCAPED BY '"' LINES TERMINATED BY '\r\n';
    

6. 参考资料

  1. 阿里云:Redis 内存分析方法