嗨,老铁,欢迎来到我的博客!

如果觉得我的内容还不错的话,可以关注下我在 segmentfault.com 上的直播。我主要从事 PHP 和 Java 方面的开发,《深入 PHP 内核》作者之一。

[视频直播] PHP 进阶之路 - 亿级 pv 网站架构的技术细节与套路 直播中我将毫无保留的分享我这六年的全部工作经验和踩坑的故事,以及会穿插着一些面试中的 考点难点加分点

周梦康 发表于 2015-12-12 3902 次浏览 标签 : Yar

在 yar 中规定的传输协议如下图所示,请求体为82个字节的yar_header_t和8字节的打包名称和请求实体yar_request_t,在yar_header_t里面用body_len记录8字节的打包名称+请求实体的长度;返回体类似,只是实体内容的结构体稍微不同,在reval里面才是实际最后客户端需要的结果。

整个传输以二进制流的形式传送。


在认识 Yar 协议之后,就是在 Java 中实现类似的数据结构和数据的二进制装包和解包,以及数据的传输了。为了记录自己一步步封装的过程,先使用比较过程的方式来实现这一个功能。

主要涉及到了三个类:

  1. YarClient类,传输和接受数据的实现;

  2. YarProtocol类,按照上图的传输协议来把yar_header_tpackager_name还有yar_request_tyar_response_t)写入二进制流,也就是拼装和解析传输数据;

  3. JsonPackager类,实现对yar_request_tyar_response_t的装包和拆包。

下面的代码只能提供整体的大概思路,如果要运行需要下载完整版的代码,在 Jdk 1.8 的环境下编译运行。

完整代码地址:https://github.com/zhoumengkang/notes/tree/1.0.0/java/yartest

又对下面的代码仿照 Yar 做了传输模块的抽象工厂等,代码地址:https://github.com/zhoumengkang/notes/tree/1.0.1/java/yartest

package yar;

import yar.packager.YarPackager;
import yar.protocol.YarRequest;
import yar.protocol.YarResponse;

import java.io.*;
import java.lang.reflect.Proxy;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashMap;

/**
 * Created by zhoumengkang on 2/12/15.
 */

public class YarClient {
    private String uri;
    private String packager;

    public YarClient(String uri){
        this.uri = uri;
        this.packager = YarConfig.getString("yar.packager");
    }

    public final Object useService(Class type) {
        YarClientInvocationHandler handler = new YarClientInvocationHandler(this);
        if (type.isInterface()) {
            return Proxy.newProxyInstance(type.getClassLoader(), new Class[]{type}, handler);
        } else {
            return Proxy.newProxyInstance(type.getClassLoader(), type.getInterfaces(), handler);
        }
    }

    public Object invoke(String method,Object[] args){

        YarRequest yarRequest = new YarRequest();
        yarRequest.setId(123456789);
        yarRequest.setMethod(method);
        yarRequest.setParameters(args);
        yarRequest.setPackagerName(this.packager);

        byte[] res = null;
        try {
            res = sendPost(this.uri, YarProtocol.requestCreate(yarRequest));
        } catch (IOException e) {
            e.printStackTrace();
        }

        YarResponse yarResponse = YarProtocol.responseFetch(res);
        assert yarResponse != null;
        return yarResponse.getRetVal();
    }



    public static byte[] sendPost(String url, byte[] content) {
        OutputStream out = null;
        InputStream in = null;
        byte[] b = null;
        try {
            URL realUrl = new URL(url);
            URLConnection conn = realUrl.openConnection();
            conn.setRequestProperty("accept", "*/*");
            conn.setRequestProperty("connection", "Keep-Alive");
            conn.setDoOutput(true);
            conn.setDoInput(true);

            out = conn.getOutputStream();
            out.write(content);
            out.flush();
            out.close();

            in = conn.getInputStream();
            b = new byte[in.available()];
            in.read(b);
            in.close();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (out != null) {
                    out.close();
                }
                if (in != null) {
                    in.close();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }


        return b;
    }

}
package yar;

import yar.packager.YarPackager;
import yar.protocol.*;

import java.io.*;
import java.util.Arrays;

/**
 * Created by zhoumengkang on 5/12/15.
 */
public class YarProtocol {

    public static final int YAR_PROTOCOL_MAGIC_NUM = 0x80DFEC60;
    public static final int YAR_HEADER_LENGTH = 82;
    public static final int YAR_PACKAGER_NAME_LENGTH = 8;

    public static YarHeader render(YarRequest yarRequest,int length){
        YarHeader yarHeader = new YarHeader();
        yarHeader.setId((int) yarRequest.getId());
        yarHeader.setMagicNum(YAR_PROTOCOL_MAGIC_NUM);
        yarHeader.setBodyLen(length);
        yarHeader.setProvider(YarConfig.getString("yar.provider"));
        yarHeader.setToken(YarConfig.getString("yar.token"));
        return yarHeader;
    }

    public static YarHeader parse(byte[] content){
        YarHeader yarHeader = new YarHeader();

        DataInputStream in = new DataInputStream(new ByteArrayInputStream(content));
        try {
            yarHeader.setId(in.readInt());
            yarHeader.setVersion(in.readShort());
            if (in.readInt() != YAR_PROTOCOL_MAGIC_NUM) {
                return null;
            }
            yarHeader.setReserved(in.readInt());

            byte[] provider = new byte[32];
            in.read(provider,0,32);
            yarHeader.setProvider(new String(provider));

            byte[] token = new byte[32];
            in.read(token,0,32);
            yarHeader.setToken(new String(token));

            yarHeader.setBodyLen(in.readInt());

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return yarHeader;
    }

    public static YarResponse responseFetch(byte[] responseByte){

        YarResponse yarResponse = new YarResponse();

        byte[] header = new byte[YAR_HEADER_LENGTH];
        for (int i = 0; i < YAR_HEADER_LENGTH; i++) {
            header[i] = responseByte[i];
        }

        YarHeader yarHeader = YarProtocol.parse(header);

        byte[] packager = new byte[YAR_PACKAGER_NAME_LENGTH];
        int packagerLength = 0;
        for (int i = 0; i < YAR_PACKAGER_NAME_LENGTH; i++) {
            packager[i] = responseByte[YAR_HEADER_LENGTH + i];
            // 在这个8字节中,当是 php 或者是 json 的时候,后面的三个或者四个字节可能之前已经被占用,需要截取下
            if (packager[i] == 0){
                packagerLength = i;
                break;
            }
        }

        String packagerName = new String(packager);
        YarClient.debug(packagerName);
        yarResponse.setPackagerName(packagerName.substring(0, packagerLength));

        int off = YAR_HEADER_LENGTH + YAR_PACKAGER_NAME_LENGTH;
        int len = responseByte.length;

        byte[] yarResponseBody = new byte[len];
        for (int i = off; i < len; i++) {
            yarResponseBody[i - off] = responseByte[i];
        }

        try {
            return YarPackager.get(yarResponse.getPackagerName()).unpack(yarResponseBody);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public static byte[] requestCreate(YarRequest yarRequest) throws IOException {

        byte[] body;
        ByteArrayOutputStream bodyOut = new ByteArrayOutputStream();
        try {
            bodyOut.write(Arrays.copyOf(yarRequest.getPackagerName().toUpperCase().getBytes(), 8));
            bodyOut.write(YarPackager.get(yarRequest.getPackagerName()).pack(yarRequest));

            body = bodyOut.toByteArray();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        } finally {
            bodyOut.close();
        }

        YarHeader yarHeader = render(yarRequest,body.length);

        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream out = new DataOutputStream(byteArrayOutputStream);

        try {
            out.writeInt(yarHeader.getId());
            out.writeShort(yarHeader.getVersion());
            out.writeInt(yarHeader.getMagicNum());
            out.writeInt(yarHeader.getReserved());
            out.write(Arrays.copyOf(yarHeader.getProvider().getBytes(),32));
            out.write(Arrays.copyOf(yarHeader.getToken().getBytes(),32));
            out.writeInt(yarHeader.getBodyLen());

            out.write(body);

            return byteArrayOutputStream.toByteArray();
        } finally {
            byteArrayOutputStream.close();
            out.close();
        }

    }

}
package yar.packager;

import org.json.JSONObject;
import yar.YarClient;
import yar.protocol.YarRequest;
import yar.protocol.YarResponse;

import java.util.Map;

/**
 * Created by zhoumengkang on 4/12/15.
 */
public class JsonPackager extends YarPackager {
    @Override
    public byte[] pack(YarRequest yarRequest) {
        Map<String,Object> request = requestFormat(yarRequest);
        JSONObject jsonObject = new JSONObject(request);
        String string = jsonObject.toString();
        return string.getBytes();
    }

    @Override
    public YarResponse unpack(byte[] content) {
        JSONObject jsonObject = new JSONObject(new String(content));
        YarClient.debug(jsonObject);
        YarResponse yarResponse = new YarResponse();
        yarResponse.setId(jsonObject.getLong("i"));
        yarResponse.setStatus(jsonObject.getInt("s"));
        yarResponse.setOut(jsonObject.getString("o"));
        yarResponse.setRetVal(jsonObject.get("r"));

        return yarResponse;
    }
}


嗨,老铁,欢迎来到我的博客!

如果觉得我的内容还不错的话,可以关注下我在 segmentfault.com 上的直播。我主要从事 PHP 和 Java 方面的开发,《深入 PHP 内核》作者之一。

[视频直播] PHP 进阶之路 - 亿级 pv 网站架构的技术细节与套路 直播中我将毫无保留的分享我这六年的全部工作经验和踩坑的故事,以及会穿插着一些面试中的 考点难点加分点

评论列表