PHP通过URL和WebSocket两种方法获取火币K线数据

全屏阅读
  • 基本信息

最近项目上线一个模块需要获取火币的K线数据,初期我用的Workerman定时任务每秒通过URL请求获取,做出来之后老板感觉数据实时性不强,要优化,没办法我只能继续研究,幸好在GitHub看到一个老哥写的通过WebSocket获取火币数据的,话不多说,下面直接开始上代码。第一次写博客,写的如果不好,还请大家见谅。

URL请求方法

发现这种方法实现起来也有坑,网上大部分都只是贴出接口文档和代码,但是实际操作会发现无法请求火币服务器,为啥,因为人家在国外,偶尔能请求概率也很低,所以代码只能放到外网服务器才能执行,这样一来开发调试就很麻烦。后面我就想了一个办法,找一台外网服务器,布置一个脚本代理请求(非火币的外网请求也可以),这样在国内也可以请求火币接口了,调试什么的方便多了。

下面贴出代理脚本代码:

$url = urldecode($_GET['url']);
if ($url) {
    echo curl_get($url);
    die();
}else{
  echo "How are you";
}
function curl_get($url, $timeout = 5)
{
    $ssl = substr($url, 0, 8) == "https://" ? TRUE : FALSE;
    $ch = curl_init();
    $headers = array(
        "Content-Type: application/json charset=utf-8",
        'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36',
    );

    $opt = array(
        CURLOPT_URL => $url,
        CURLOPT_HEADER => 0,
        CURLOPT_CUSTOMREQUEST => strtoupper('GET'),
        CURLOPT_RETURNTRANSFER => 1,
        CURLOPT_TIMEOUT => $timeout,
        CURLOPT_HTTPHEADER => $headers,
    );

    if ($ssl) {
        $opt[CURLOPT_SSL_VERIFYHOST] = false;
        $opt[CURLOPT_SSL_VERIFYPEER] = FALSE;
    }
    curl_setopt_array($ch, $opt);
    $result = curl_exec($ch);
    curl_close($ch);
    return $result;
}

获取数据的代码:

$now = time();
$diff = intval((strtotime(date('Y-m-d H:i:00'), $now) - $find['add_time']) / self::$time_list[$period]);
$size = $diff + 1 > 2000 ? 2000 : $diff + 1;
$url = "https://api.huobipro.com/market/history/kline?period={$period}&size={$size}&symbol={$symbol}";
$log .= ",url:{$url}";
//如果服务器在国内,需要把public目录下的post.php文件部署到外网服务器代理请求火币api
$post_url = 'http://xxx.com/post.php?url='.urlencode($url);
$log .= ",post_url:{$post_url}";
$res = self::curl_get($post_url, 5);
if (!$res) throw new Exception(lang('火币请求失败'));
$res = json_decode($res, true);
if ($res['status'] != 'ok') throw new Exception("火币网返回错误,err-code:{$res['err-code']},err-msg:{$res['err-msg']}");
if (empty($res['data'])) throw new Exception("火币网返回数据为空");
$huobi = $res['data'];
$ids = array_column($huobi,'id');
array_multisort($ids,SORT_ASC,$huobi);
$add_list = [];
$update_list = [];
foreach ($huobi as $key1 => $value1) {
    $where1 = $where;
    $where1['add_time'] = $value1['id'];
    $find1 = (new self)->where($where1)->order('id', 'desc')->find();
    if ($find1) {//记录已存在,更新已有记录
        $update_list[] = [
            'id'=>$find1['id'],
            'open_price'=>number_format($value1['open'],6,".",""),
            'close_price'=>number_format($value1['close'],6,".",""),
            'high_price'=>number_format($value1['high'],6,".",""),
            'low_price'=>number_format($value1['low'],6,".",""),
            'amount'=>number_format($value1['amount'],6,".",""),
            'count'=>number_format($value1['count'],6,".",""),
            'vol'=>number_format($value1['vol'],6,".",""),
            'ch'=>$res['ch'],
            //'add_time'=>$value1['id'],
            'update_time'=>time(),
        ];
    }
    else {
        $add_list[] = [
            'period'=>$period,
            'symbol'=>$symbol,
            'open_price'=>number_format($value1['open'],6,".",""),
            'close_price'=>number_format($value1['close'],6,".",""),
            'high_price'=>number_format($value1['high'],6,".",""),
            'low_price'=>number_format($value1['low'],6,".",""),
            'amount'=>number_format($value1['amount'],6,".",""),
            'count'=>number_format($value1['count'],6,".",""),
            'vol'=>number_format($value1['vol'],6,".",""),
            'ch'=>$res['ch'],
            'add_time'=>$value1['id'],
            'update_time'=>time(),
        ];
    }
}


static function curl_get($url, $timeout = 30)
{
    $ssl = substr($url, 0, 8) == "https://" ? TRUE : FALSE;
    $ch = curl_init();
    $headers = array(
        "Content-Type: application/json charset=utf-8",
        'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36',
    );

    $opt = array(
        CURLOPT_URL => $url,
        CURLOPT_HEADER => 0,
        CURLOPT_CUSTOMREQUEST => strtoupper('GET'),
        CURLOPT_RETURNTRANSFER => 1,
        CURLOPT_TIMEOUT => $timeout,
        CURLOPT_HTTPHEADER => $headers,
    );

    if ($ssl) {
        $opt[CURLOPT_SSL_VERIFYHOST] = false;
        $opt[CURLOPT_SSL_VERIFYPEER] = FALSE;
        $opt[CURLOPT_SSLVERSION] = 3;
    }
    curl_setopt_array($ch, $opt);
    $result = curl_exec($ch);
    if (!$result) {
        $error = curl_error($ch);
        $errno = curl_errno($ch);
        Log::write("curl_get,url:{$url},error:{$error},error:{$errno}", 'INFO');
    }
    curl_close($ch);
    return $result;
}

WebSocket方法获取

这种方法主要是把服务器当成一个WebSocket Client连接到火币的WebSocket服务器,订阅交易对的不同时间粒度的K线数据,成功之后火币服务器就会在K线数据变化的时候主动推送消息给到服务器,接收到推送之后就可以进行存储等操作,同时服务器也把接收到的推送消息再推送给连接到服务端的WebSocket Client。

因为火币服务器在外网,所以采用WebSocket方法也需要部署一台国外服务器,这台服务器用于向火币订阅数据并接收火币推送,其他国内服务器就可以通过连接这台服务器获取火币K线数据。这种应该也可以分布式部署,大家可以试一下。结构图如下:

image.png

连接到火币服务器代码:

$info = "连接到服务器:{$this->host}";
echo "\r\n".$info;
$this->saveLog("huobi", $info);
// 异步建立一个到火币服务器的连接
$con = new AsyncTcpConnection($this->host);
if ($this->flag) {//正式环境
    $con->transport = 'ssl';
}

$con->onConnect = function($con)
{
    $this->onAsyncConnect($con);
};

// 当服务器连接发来数据时,转发给对应客户端的连接
$con->onMessage = function($con, $message) use($worker)
{
    $this->onAsyncMessage($con, $message, $worker);
};

$con->onError = function($con, $err_code, $err_msg)
{
    echo "$err_code, $err_msg";
    $info = "Async onError err_code:{$err_code},err_msg:{$err_msg}";
    echo "\r\n ".$info;
    $this->saveLog("huobi", $info);
};

$con->onClose = function($con)
{
    $info = "Async onClose";
    echo "\r\n ".$info;
    $this->saveLog("huobi", $info);
    $this->reconnect_num++;//重连次数+1

    //重连之前先更新K线数据
    $info = "更新K线数据-重连之前-start:".date('Y-m-d H:i:s');
    echo "\r\n ".$info;
    $this->saveLog("huobi", $info);
    foreach ($this->trade_list as $value) {
        $symbol = $value;
        foreach ($this->time_list as $k => $v) {
            $info = "create_kline:{$symbol}-{$k}";
            echo "\r\n ".$info;
            $this->saveLog("huobi", $info);
            $r = \app\common\model\TradeKlineKline::create_kline($symbol, $k);
            if ($r['code'] == SUCCESS) {

            }
        }
    };
    $info = "更新K线数据-重连之前-end:".date('Y-m-d H:i:s');
    echo "\r\n ".$info;
    $this->saveLog("huobi", $info);

    // 如果连接断开,则在1秒后重连
    $con->reConnect(1);
};

// 执行异步连接
$con->connect();

//连接火币成功回调方法
function onAsyncConnect($con)
 {
    $this->async_message_time = time();
    $info = "连接到服务器:{$this->host},成功";
    echo "\r\n".$info;
    $this->saveLog("huobi", $info);

    $info = "开始订阅K线数据";
    echo "\r\n".$info;
    $this->saveLog("huobi", $info);
    //$this->saveLog("huobi", 'onAsyncConnect:'.print_r($con, true));
    $this->saveLog("huobi", 'onAsyncConnect,cid:'.$con->id.',reconnect_num:'.$this->reconnect_num);
    $make = explode(',', TradeConfig::get_value('trade_kline_symbols', 'btcusdt,ethusdt,eosusdt,ltcusdt,etcusdt'));
    $this->huobi_id = $con->id;
    foreach ($make as $key => $value) {
        $symbol = $value;
        foreach ($this->time_list as $k => $v) {
            $info = "sub:{$symbol}-{$k}";
            echo "\r\n".$info;
            $this->saveLog("huobi", $info);
            $data = json_encode([                         //行情
                'sub' => "market." . $symbol . ".kline." . $k,
                'id' => "id" . time(),
                'freq-ms' => 5000
            ]);
            $con->send($data);
        }
    }
    /*foreach ($this->trade_list as $key => $value) {
        $symbol = $key;
        foreach ($this->time_list as $k => $v) {
            echo "sub:{$symbol}-{$k}\r\n";
            $data = json_encode([                         //行情
                'sub' => "market." . $symbol . ".kline." . $k,
                'id' => "id" . time(),
                'freq-ms' => 5000
            ]);
            $con->send($data);
        }
    };*/
}

//接收到火币推送回调方法
function onAsyncMessage($con, $message, $worker)
{
    $data = json_decode($message, true);
    if (!$data) {//说明采用了GZIP压缩
        $data = gzdecode($message);
        $this->saveLog("huobi", $data);
        $data = json_decode($data, true);
    }
    else {
        $this->saveLog("huobi", $message);
    }
    if(isset($data['ping'])) {
        $this->async_message_time = time();
        $con->send(json_encode([
            "pong" => $data['ping']
        ]));

        // 给客户端心跳
        foreach($this->all_cons as $kk=>$vv){
            if (array_key_exists($vv["sid"], $worker->connections)) {
                $info = "\r\n sid ".$vv["sid"]." send ping";
                echo $info;
                $this->saveLog("all", $info);

                $worker->connections[$vv["sid"]]->send(json_encode($data));
            }
            else {
                unset($this->all_cons[$kk]);
            }
        }
    } else if (isset($data['ch'])) {
        $this->async_message_time = time();
        $info = "接收到推送,ch:{$data['ch']}";
        echo "\r\n".$info;
        $this->saveLog("huobi", $info);
        //Log::write(print_r($data, true), 'INFO');

        $symbol = $data["ch"];
        $info = "\r\n  on mess size:".sizeof($this->all_cons)." conn-size: ".sizeof($worker->connections)." symbol:".$symbol;
        echo $info;
        $this->saveLog("all", $info);
        $pieces = explode(".", $data['ch']);
        switch ($pieces[2]) {
            case "kline":              //行情图
                $market = $pieces[1];  //火币对
                if (in_array($market, $this->symbol_list)) {
                    $period = $pieces[3];
                    $tick = $data['tick'];
                    //tick 说明
                    //"tick": {
                    //  "id": K线id,
                    //  "amount": 成交量,
                    //  "count": 成交笔数,
                    //  "open": 开盘价,
                    //  "close": 收盘价,当K线为最晚的一根时,是最新成交价
                    //  "low": 最低价,
                    //  "high": 最高价,
                    //  "vol": 成交额, 即 sum(每一笔成交价 * 该笔的成交量)
                    //}
                    $id = $tick['id'];
                    $where = [
                        'period'=>$period,
                        'symbol'=>$market,
                        'add_time'=>$id,
                    ];
                    $find1 = \app\common\model\TradeKline::where($where)->order('id', 'desc')->find();
                    if ($find1) {//记录已存在,更新已有记录
                        if ($find1['open_price'] != $tick['open'] ||
                            $find1['close_price'] != $tick['close'] ||
                            $find1['high_price'] != $tick['high'] ||
                            $find1['low_price'] != $tick['low'] ||
                            $find1['amount'] != $tick['amount'] ||
                            $find1['count'] != $tick['count'] ||
                            $find1['vol'] != $tick['vol']) {//没有数据变化不做更新
                            $update_list[] = [
                                'id'=>$find1['id'],
                                'open_price'=>number_format($tick['open'],6,".",""),
                                'close_price'=>number_format($tick['close'],6,".",""),
                                'high_price'=>number_format($tick['high'],6,".",""),
                                'low_price'=>number_format($tick['low'],6,".",""),
                                'amount'=>number_format($tick['amount'],6,".",""),
                                'count'=>number_format($tick['count'],6,".",""),
                                'vol'=>number_format($tick['vol'],6,".",""),
                                'update_time'=>time(),
                            ];
                            $kline = new \app\common\model\TradeKline;
                            $res2 = $kline->isUpdate()->saveAll($update_list);
                            if (empty($res2)) {
                                var_dump(lang('更新记录失败-2').'-in line:'.__LINE__);
                                //throw new Exception(lang('更新记录失败-2').'-in line:'.__LINE__);
                            }
                        }
                    }
                    else {
                        $add_list[] = [
                            'period'=>$period,
                            'symbol'=>$market,
                            'open_price'=>number_format($tick['open'],6,".",""),
                            'close_price'=>number_format($tick['close'],6,".",""),
                            'high_price'=>number_format($tick['high'],6,".",""),
                            'low_price'=>number_format($tick['low'],6,".",""),
                            'amount'=>number_format($tick['amount'],6,".",""),
                            'count'=>number_format($tick['count'],6,".",""),
                            'vol'=>number_format($tick['vol'],6,".",""),
                            'ch'=>$data['ch'],
                            'add_time'=>$id,
                            'update_time'=>time(),
                        ];
                        $kline = new \app\common\model\TradeKline;
                        $res1 = $kline->saveAll($add_list);
                        if (empty($res1)) {
                            var_dump(lang('插入记录失败').'-in line:'.__LINE__);
                            //throw new Exception(lang('插入记录失败').'-in line:'.__LINE__);
                        }
                    }
                }

                break;
        }

        $time_1 = microtime(true);

        if (array_key_exists($symbol, $this->all_symbols)) {
            foreach ($this->all_symbols[$symbol] as $key => $val) {
                $info = " symbol ".$symbol." | ch ".$data["ch"]." sid ".$val." send \r\n";
                echo $info;
                $this->saveLog("all", $info);

                $worker->connections[$val]->send(json_encode($data));
            }
        }
        $time_2 = microtime(true);
        $cost = $time_2 - $time_1;
        if ($cost > 1) {
            $info = " symbol ".$symbol." | ch ".$data["ch"]." cost {$cost} \r\n";
            echo $info;
            $this->saveLog("all", $info);
        }

    }
    else {
        echo "undefind message\r\n";
        var_dump($data);
    }
}

huobiwebsocket.zip

顶一下
(0)
100%
订阅 回复
踩一下
(0)
100%
» 郑重声明:本文由mpxq168发布,所有内容仅代表个人观点。版权归恒富网mpxq168共有,欢迎转载, 但未经作者同意必须保留此段声明,并给出文章连接,否则保留追究法律责任的权利! 如果本文侵犯了您的权益,请留言。

目前有 0 条留言 其中:访客:0 条, 博主:0 条

给我留言

您必须 [ 登录 ] 才能发表留言!