嗨,老铁,欢迎来到我的博客!

如果觉得我的内容还不错的话,可以关注下我在 segmentfault.com 上的直播。我主要从事 PHP 和 Java 方面的开发,《深入 PHP 内核》作者之一。

[视频直播] PHP 进阶之路 - 亿级 pv 网站架构的技术细节与套路 直播中我将毫无保留的分享我这六年的全部工作经验和踩坑的故事,以及会穿插着一些面试中的 考点难点加分点

周梦康 发表于 2015-12-15 2832 次浏览 标签 : Yar

官方 php 客户端文档如下

Yar_Concurrent_Client {
    /* 属性 */
    static $_callstack ;
    static $_callback ;
    static $_error_callback ;
    /* 方法 */
    public static int call ( string $uri , string $method , array $parameters [, callable $callback ] )
    public static boolean loop ([ callable $callback [, callable $error_callback ]] )
    public static boolean reset ( void )
}

也就是说call方法实际实在注册并行的服务调用,loop是统一发送,reset是清空调用任务集。下面我也需要实现上面类似的功能。

首先学习下ExecutorService 的使用

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * Created by zhoumengkang on 16/12/15.
 */
public class YarConcurrentClient {

    private static ExecutorService executorService;

    static{
        poolInit();
    }

    private static void poolInit(){
        executorService = Executors.newCachedThreadPool();
    }

    public void call() throws ExecutionException, InterruptedException {
        List<Future<String>> result =new ArrayList<Future<String>>();
        for (int i = 0; i < 4; i++) {
            Future<String> future = executorService.submit(new YarClientCallable(i));
            result.add(future);
        }
        for(Future<String> future:result){
            System.out.println("返回值:"+ future.get());
        }

    }

    public class YarClientCallable implements Callable<String> {

        private int seq;

        public YarClientCallable(int seq) {
            this.seq = seq;
        }

        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName());
            Thread.sleep(3000);
            System.out.println("Weak up" + seq);
            return "完成" + seq;
        }
    }

}

测试下call方法

import junit.framework.TestCase;


/**
 * Created by zhoumengkang on 16/12/15.
 */
public class YarConcurrentClientTest extends TestCase {
    public void testName() throws Exception {
       new YarConcurrentClient().call();
       new YarConcurrentClient().call();
       new YarConcurrentClient().call();
    }
}

测试结果为

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
Weak up1
Weak up3
Weak up2
Weak up0
返回值:完成0
返回值:完成1
返回值:完成2
返回值:完成3
pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
Weak up1
Weak up2
Weak up3
Weak up0
返回值:完成0
返回值:完成1
返回值:完成2
返回值:完成3
pool-1-thread-1
pool-1-thread-4
pool-1-thread-3
pool-1-thread-2
Weak up0
返回值:完成0
Weak up3
Weak up2
Weak up1
返回值:完成1
返回值:完成2
返回值:完成3

Thread.currentThread().getName()可知,只生成了一个线程池,并且该池里的4个线程也被被重复利用了。

YarConcurrentClient 雏形

package yar.concurrent.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import yar.YarConfig;
import yar.protocol.YarRequest;
import yar.protocol.YarResponse;
import yar.transport.YarTransport;
import yar.transport.YarTransportFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * Created by zhoumengkang on 2/12/15.
 */
public class YarConcurrentClient {

    protected final static Logger logger = LoggerFactory.getLogger(YarConcurrentClient.class);

    private static ExecutorService executorService;
    private static List<YarConcurrentTask> yarConcurrentTasks;

    static{
        init();
    }

    private static void init(){
        yarConcurrentTasks = new ArrayList<YarConcurrentTask>();
        executorService = Executors.newCachedThreadPool();
    }

    public static void call(YarConcurrentTask yarConcurrentTask) {
        yarConcurrentTasks.add(yarConcurrentTask);
    }

    public static void loop() {

        List<Future<Object>> result =new ArrayList<Future<Object>>();

        try{
            for (YarConcurrentTask task : yarConcurrentTasks){
                Future<Object> future = executorService.submit(new YarClientCallable(task));
                result.add(future);
            }

        }catch(Exception e){

        }


        for(Future<Object> future:result){
            try {
                logger.info("返回值"+future.get().toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    public static void reset(){
        yarConcurrentTasks = null;
        yarConcurrentTasks = new ArrayList<YarConcurrentTask>();
    }

    public static class YarClientCallable implements Callable<Object> {

        private YarConcurrentTask yarConcurrentTask;

        public YarClientCallable(YarConcurrentTask yarConcurrentTask) {
            this.yarConcurrentTask = yarConcurrentTask;
        }

        public Object call() throws Exception {

            logger.debug("开始处理任务" + yarConcurrentTask.getId());

            YarResponse yarResponse = null;

            YarRequest yarRequest = new YarRequest();
            yarRequest.setId(yarConcurrentTask.getId());
            yarRequest.setMethod(yarConcurrentTask.getMethod());
            yarRequest.setParameters(yarConcurrentTask.getParams());
            yarRequest.setPackagerName(YarConfig.getString("yar.packager"));

            YarTransport yarTransport = YarTransportFactory.get(YarConfig.getString("yar.transport"));
            yarTransport.open("http://10.211.55.4/yar/server/RewardScoreService.class.php");

            try {
                yarResponse = yarTransport.exec(yarRequest);
            } catch (IOException e) {
                e.printStackTrace();
            }
            assert yarResponse != null;

            return yarResponse.getRetVal();
        }
    }

}


嗨,老铁,欢迎来到我的博客!

如果觉得我的内容还不错的话,可以关注下我在 segmentfault.com 上的直播。我主要从事 PHP 和 Java 方面的开发,《深入 PHP 内核》作者之一。

[视频直播] PHP 进阶之路 - 亿级 pv 网站架构的技术细节与套路 直播中我将毫无保留的分享我这六年的全部工作经验和踩坑的故事,以及会穿插着一些面试中的 考点难点加分点

评论列表