Skip to content

远程设计中的RPC框架设计梳理

后续会负责远程对接微服务的具体实现,在准备微服务那边的基础内容之前,先对新设计的RPC框架进行梳理,看看具体的逻辑是什么

📝 框架

整体框架图如下:

Untitled

基础前置知识 HTTP调用与RPC调用的关系

Client

基础数据

java
public class Stub<T> {

    private String serviceName;

    private Class<T> serviceClass;

    private final Map<AnyKey<?>, MetaData> metaDataMap = new HashMap<>();

    private T proxy;
}

stub是干什么的?——合理猜测,正常来讲stub是存根,可能存放服务端的地址信息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方

stub怎么用——目前在代码中看,只有注册服务的时候创建了对应的Stub

java
private void registerService(MetaData metaData) {

        String serviceName = metaData.getService();
        StringKey serviceNameKey = StringKey.of(serviceName);

        Class<?> serviceClass = metaData.getProxyClass();
        ClassKey serviceClassKey = ClassKey.of(serviceClass);

        Function<AnyKey<?>, Stub<?>> serviceFunc = anyKey -> Stub.of(serviceName, serviceClass);

        Stub<?> service = serviceMap.computeIfAbsent(serviceNameKey, serviceFunc);
        serviceMap.putIfAbsent(serviceClassKey, service);

        service.addMetaData(metaData);
    }

再往上看,什么时候注册服务

java
public <T> void registerStub(Class<T> stubClass) {
        Method[] methods = stubClass.getMethods();
        Arrays.stream(methods)
                .map((method) -> parser.parse(stubClass, method))
                .filter(Objects::nonNull)
                .forEach(this::registerService);
    }

总结一下,在ServiceStubPool中注册一个存根类,这个方法获取给定存根类的所有方法,使用提供的解析器进行解析, 并将结果的元数据注册到ServiceStubPool中。

再详细点就是:

  1. 获取**stubClass类中的所有方法,通过getMethods()方法得到Method数组,对每个方法,通过parser.parse(stubClass, method)**进行解析
  2. parser包含两种
    • HttpMetaDataParser :http形式的解析器
    • WorkV2MetaDataParser :更像是一个兼容包装,如果方法没有Service注解,就走WorkV1MetaData 的兼容逻辑,走WorkV1Invocation 的兼容调用逻辑
    • 兼容逻辑介绍:
      • Invocation :存储新的调用的原始信息 ,包路径:com.fr.workspace.rpc.invocation
      • WorkV1Invocation :内部存放了com.fr.rpc.Invocation 的Invocation,这玩意是旧RPC的调用,走兼容等于走旧的RPC调用的逻辑
  3. 将解析出来的元数据(包含新的,也可能包含兼容的)一个个注册进去ServiceStubPool ,形成一个具体调用Map
  • [x] 服务端 HTTPPaser中Endpoint是干啥的,RequestMapping可以用来接受请求,那Endpoint是干啥的
    • Endpoint注解只是标记,作定位符使用,关键还是在request注解

元数据存储的是一次服务的行为

  • 服务名称
  • 中止点
  • 代理的类
  • 方法
  • 参数的类型与返回的类型
  • 发送接收的方案mode
  • 额外的其他属性Attributes(暂不确定有啥用,只是保留扩展)
java
public class MetaData {  
    private String service;  
    private String endpoint;  
    private Class<?> proxyClass;  
    private Method method;  
    private Type[] parameterTypes;  
    private Type returnType;  
    private TransferMode sendMode = TransferMode.Common;  
    private TransferMode retMode = TransferMode.Common;    
    private Attributes attributes = new Attributes();

那么什么时候会在pool中注册一个存根类呢?

Untitled

主要是两个时机

  • 一个是创建RPC客户端的时候,会创建服务存根池,将FineObjectPool 中的已有服务都解析存入服务池
  • 另一个是后续添加的时候,同步添加到存根池

看上去其实就是连接的时候会将当前已有的RPC服务都处理一遍加进来,后续如果有注册也会同步做添加处理

**小结:**创建RPC客户端的时候会同步创建服务存根池,将当前的可用的服务调用都存进来,实现了新的RPC方式就存MetaData ,不然就存兼容的 WorkV1MetaData ,整体封装好后,一个RPCClient就包含了所有可用服务的具体信息,后续调用的时候走的Invocation也都是各自已经处理好的新老逻辑

具体调用

基础的stub了解差不多了,就要开始看具体的调用逻辑了,目前来看,stub更像是将服务抽象成metadata,封装好整个基础属性,后续调用大概率还是走的proxy

客户端协议(Protocol

java
HttpProtocol protocol = HttpProtocol
                .newBuilder()
                .withContext(new RpcContext(id))
                .withMsgParser(new WorkV2MsgParser(new HttpTransportMessageParser()))
                .withClientProvider(transportClients)
                .withFilterChain(FilterChain.create(new MetricFilter(), new AsyncFilter()))
                .withInterceptors(
                        // http-jdk-rpc
                        create4WorkV1(),
                        // http-json-raw
                        create4JsonRaw(),
                        // http-json-rpc
                        create4JsonRpc()
                )
                .build();

创建RPCClient的时候,会同步创建客户端协议并注册进客户端内

创建客户端协议的主要流程

  1. 利用建造者模式去构造协议
  2. 利用上下文的唯一ID去构造RPC上下文
  3. 构造消息解析器
    1. 当前只有HttpTransportMessageParser,会根据不同的传输模式(正常RPC、Stream、IO)有不同的处理
    2. WorkV2MsgParser更像是做了个包装,完善HTTPMessage整体调用的逻辑(为啥不在里面的parser就做好? 存疑)
  4. 注册传输客户端,用来发送,关闭数据
  5. 创建过滤链
  6. 注册拦截组
    1. *create4WorkV1*() :兼容, Service 注解如果没有就accept
    2. *create4JsonRaw :*Request 注解是否存在,存在就accept(raw指的是啥)
    3. *create4JsonRpc :*Request 注解不存在就accpet

到目前为止,RPCClient的结构大概就清晰了

Untitled

如何获取服务

java
public <T> T getService(String serviceName) {

        Stub<T> stub = (Stub<T>) stubPool.getStub(serviceName);
        return ServiceStubProxy.proxy(stub, new ServiceInvocationHandler(stubPool, protocol));
    }

public <T> T getService(Class<T> clazz) {

        Stub<T> stub = stubPool.getStub(clazz);
        return ServiceStubProxy.proxy(stub, new ServiceInvocationHandler(stubPool, protocol));
    }

目前看提供了两个获取Service的方式

  • 一种是通过服务的名称来获取
  • 另一种是直接通过获取服务的Class来获取对应的服务

获取Service的流程大概如下

  1. 从RPC客户端的服务池里面拿到我们封装好的服务存根

  2. 根据RPC客户端的客户端协议和服务池来包装服务调用拦截(处理)器

  3. ServiceStubProxy 进行组装调用

    • 组装的时候会根据客户端协议来组装clientInvoker ,其中包含了具体FineClientInvoker(最后调用),以及FineClientInvoker 前的拦截链路
    • 构造invoker的时候会利用filterChain过滤器链,链式传递 Invocation, 并在过滤器中处理业务逻辑
    java
    public Invoker invoker() {
            return filterChain.origin(new FineClientInvoker(this)).build();
    }

📌 构造invoker的过程详细如下:

  • new一个客户端调用器,位于调用链的最后一个,负责
    • 1. 获取发送客户端
    • 2. 拼凑信息
    • 3. 发送、收取
    • 4. 处理响应
  • origin 设置起始 Invoker 对象
  • build 构造如下链路,实际上就是先注册的filter后执行

Untitled

FineClientInvoker具体的invoke逻辑(使用给定的调用调用远程方法)

java
public Result invoke(Invocation invocation) throws Exception {

        try {
            // get channel
            TransportClient channel = this.clientProvider.getClient();

            // genTemplate
            Message message = messageParser.parse(invocation);

            // interceptor select
            InterceptorGroup group = interceptorSelector.select(message);

            // intercept msg
            MessageInterceptor msgInterceptor = group.getMsgInterceptor();
            msgInterceptor.intercept(message);

            // 通过数据管道 发送/获取 结果
            Result result = channel.send(message);

            // intercept result
            ResultInterceptor resultInterceptor = group.getResultInterceptor();
            resultInterceptor.intercept(result);

            return result;
        } catch (FuncException sre) {
            return new CommonResult(sre.getCause());
        } catch (Throwable e) {
            return new CommonResult(e);
        }
    }
  • 先通过TransportClient来获取通道
  • 获取具体的信息
  • 拦截组匹配
  • 信息处理
  • 通过channel发送/获取结果
  • 拦截结果并处理
  • 返回结果

client总结

连接过程

Untitled

  • 有Service注解而无Request注明方法,则走新RPC
  • 有Service注解并有Request注明方法,走HTTP
  • 无Service,走兼容

Untitled

最后更新于: