REST APIサービスを用意する。

これまで作ってきたプログラムにWeb APIサービスを追加します。

  • tcp://localhost:8739
  • tcp://localhost:8740

にポートフォワードを設定して、Windows上のJupyter Notebookで実行しています。

このipynbはGitHubにあります。

WebAPI, ZeroMQ サーバーテスト用のクラス

In [1]:
import zmq
import json
import requests
import pandas as pd
import numpy as np
from io import StringIO

class UserAgent:
    # HTTP - localhost : 8739 はなさく
    WEBAPI_ADDR = "http://localhost:8739"
    # ZMQ - localhost : 8740 はなしを
    ZMQPUB_ADDR = "tcp://localhost:8740"
    #
    def __init__( self, topic ):
        self.context = zmq.Context()
        self.topic = topic
    #
    def subscribe_zmq(self):
        self.publisher = self.context.socket( zmq.SUB )
        self.publisher.setsockopt( zmq.SUBSCRIBE,  self.topic )
        self.publisher.connect( self.ZMQPUB_ADDR )
    #
    @classmethod
    def get_server_health(cls):
        r = requests.get( cls.WEBAPI_ADDR + "/health" )
        return r.json()
    #
    @classmethod
    def get_server_version(cls):
        r = requests.get( cls.WEBAPI_ADDR + "/version" )
        return r.json()
    #
    @classmethod
    def get_portfolio(cls):
        r = requests.get( cls.WEBAPI_ADDR + "/portfolio" )
        df = pd.read_json( r.content, orient='records' )
        return df.set_index('code')
    #
    @classmethod
    def get_history_csv( cls, code ):
        r = requests.get( f'{cls.WEBAPI_ADDR}/stocks/history/csv/?code={code}' )
        return r.content.decode( 'utf-8' )
    #
    @classmethod
    def get_history_web( cls, code ):
        r = requests.get( f'{cls.WEBAPI_ADDR}/stocks/history/js/?code={code}' )
        df = pd.read_json( r.content, orient='records' )
        return df.set_index('at')
    #
    @classmethod
    def req_history_zmq( cls, code ):
        r = requests.get( f'{cls.WEBAPI_ADDR}/publish?code={code}' )
        return r
    #
    def read_pub_zmq( self ):
        vs = []
        while True:
            [topic, bytestr] = self.publisher.recv_multipart()
            contents = bytestr.decode( 'utf-8' )
            v = pd.read_json( contents, orient='records' )
            if len( v ) > 0:
                # 送られてきたリストをvsに追加する
                vs.append( v )
            else:
                # リストの長さが0つまり"[]"が送られてきたら
                # 送るべき情報が無いという意味なので、それを確認したら終了する
                break;
        return topic, pd.concat(vs)

テストを始める

インスタンス作成

購読するトピックを渡してインスタンスを作り、ZeroMQ – PUB / SUB 通信を購読する

In [2]:
# すべて購読する
topic = b'' 
useragent = UserAgent( topic )
useragent.subscribe_zmq()

試しにサーバのバージョンを取得してみる

In [3]:
version = useragent.get_server_version()
version
Out[3]:
{'author': 'Akihiro Yamamoto',
 'copyright': '(c) 2016 Akihiro Yamamoto.',
 'github': 'ak1211/tractor',
 'license': 'AGPL-3',
 'maintainer': 'ak1211@mail.ak1211.com',
 'name': 'tractor',
 'version': '0.4.3'}

試しにサーバの状態を取得してみる

In [4]:
healthy = useragent.get_server_health()
healthy
Out[4]:
{'hNumCapabilities': 1,
 'hNumProcessors': 2,
 'hNumSparks': 0,
 'hStats': {'allocated_bytes': 11086176,
  'copied_bytes': 3771512,
  'cpu_ns': 112000000,
  'cumulative_live_bytes': 6818976,
  'cumulative_par_max_copied_bytes': 0,
  'elapsed_ns': 32207374128,
  'gc': {'gcdetails_allocated_bytes': 128696,
   'gcdetails_compact_bytes': 0,
   'gcdetails_copied_bytes': 147000,
   'gcdetails_cpu_ns': 4000000,
   'gcdetails_elapsed_ns': 2153273,
   'gcdetails_gen': 1,
   'gcdetails_large_objects_bytes': 172552,
   'gcdetails_live_bytes': 319152,
   'gcdetails_mem_in_use_bytes': 2097152,
   'gcdetails_par_max_copied_bytes': 0,
   'gcdetails_slop_bytes': 24912,
   'gcdetails_sync_elapsed_ns': 5541,
   'gcdetails_threads': 1},
  'gc_cpu_ns': 96000000,
  'gc_elapsed_ns': 95037068,
  'gcs': 44,
  'major_gcs': 35,
  'max_compact_bytes': 0,
  'max_large_objects_bytes': 172552,
  'max_live_bytes': 319152,
  'max_mem_in_use_bytes': 2097152,
  'max_slop_bytes': 25360,
  'mutator_cpu_ns': 16000000,
  'mutator_elapsed_ns': 32112337060,
  'par_copied_bytes': 0}}
In [5]:
print( f"{healthy['hNumProcessors']} processors" )
2 processors
In [6]:
allocated_mega_bytes = healthy['hStats']['allocated_bytes'] / 1000 / 1000
print( f"memory {allocated_mega_bytes} megabytes in use" )
memory 11.086176 megabytes in use

試しにポートフォリオを取得してみる

In [7]:
useragent.get_portfolio()
Out[7]:
caption updateAt
code
1306.T TOPIX連動型上場投資信託 2018-04-05T05:30:02+0900
1320.T ダイワ上場投信−日経225 2018-04-05T05:30:05+0900
1321.T 日経225連動型上場投資信託 2018-04-05T05:30:03+0900
1343.T NEXTFUNDS東証REIT指数連動型上場投信 2018-04-05T05:30:35+0900
1346.T MAXIS日経225上場投信 2018-04-05T05:30:44+0900
1348.T MAXISトピックス上場投信 2018-04-05T05:30:43+0900
1540.T 純金上場信託(現物国内保管型) 2018-04-05T05:30:14+0900
1552.T 国際のETFVIX短期先物指数 2018-04-05T05:30:04+0900
1597.T MAXISJリート上場投信 2018-04-05T05:30:42+0900
1699.T NEXTFUNDSNOMURA原油インデックス連動型上場投信 2018-04-05T05:30:13+0900
2206.T 江崎グリコ 2018-04-05T05:30:34+0900
3038.T 神戸物産 2018-04-05T05:30:33+0900
3382.T セブン&アイ・ホールディングス 2018-04-05T05:30:23+0900
3788.T GMOクラウド 2018-04-05T05:30:15+0900
4185.T JSR 2018-04-05T05:30:11+0900
4202.T ダイセル 2018-04-05T05:30:22+0900
4502.T 武田薬品工業 2018-04-05T05:30:37+0900
4568.T 第一三共 2018-04-05T05:30:19+0900
5019.T 出光興産 2018-04-05T05:30:38+0900
5108.T ブリヂストン 2018-04-05T05:30:39+0900
6201.T 豊田自動織機 2018-04-05T05:30:30+0900
6301.T 小松製作所 2018-04-05T05:30:40+0900
6326.T クボタ 2018-04-05T05:30:28+0900
6471.T 日本精工 2018-04-05T05:30:16+0900
6479.T ミネベアミツミ 2018-04-05T05:30:31+0900
6503.T 三菱電機 2018-04-05T05:30:17+0900
6586.T マキタ 2018-04-05T05:30:41+0900
6755.T 富士通ゼネラル 2018-04-05T05:30:21+0900
6770.T アルプス電気 2018-04-05T05:30:32+0900
6911.T 新日本無線 2018-04-05T05:30:27+0900
6952.T カシオ計算機 2018-04-05T05:30:20+0900
6963.T ローム 2018-04-05T05:30:09+0900
6976.T 太陽誘電 2018-04-05T05:30:06+0900
6999.T KOA 2018-04-05T05:30:10+0900
7203.T トヨタ自動車 2018-04-05T05:30:29+0900
7270.T JSR 2018-04-05T05:30:24+0900
7272.T ヤマハ発動機 2018-04-05T05:30:18+0900
8306.T 三菱UFJフィナンシャル・グループ 2018-04-05T05:30:07+0900
8411.T みずほフィナンシャルグループ 2018-04-05T05:30:08+0900
9449.T GMOインターネット 2018-04-05T05:30:25+0900
9531.T 東京瓦斯 2018-04-05T05:30:26+0900

時系列データーを取得してみる

  • 8411.T みずほフィナンシャルグループ
In [8]:
# Yahoo Financeのようなクエリパラメーターでね
code = '8411.T'

HTTP – WEBAPI同期通信路を試す

CSVで取得してみる

通常はJSONで取得を使う。
後々POSTでCSVをアップロードする機能を用意する予定でいる

In [9]:
csv = useragent.get_history_csv( code )
csv
Out[9]:
'at,code,open,high,low,close,close,volume,source\r\n2018-03-07T15:00:00+0900,8411.T,195.3,196.8,194.2,194.2,194.2,141824400,kabu.com\r\n2018-03-08T15:00:00+0900,8411.T,195.2,195.4,193.7,194.1,194.1,110705500,kabu.com\r\n2018-03-09T15:00:00+0900,8411.T,196.2,196.6,193.8,194.1,194.1,171279000,kabu.com\r\n2018-03-12T15:00:00+0900,8411.T,196.6,198.2,196.2,197.8,197.8,113728400,kabu.com\r\n2018-03-13T15:00:00+0900,8411.T,196.7,196.9,195.6,196.8,196.8,110007200,kabu.com\r\n2018-03-14T15:00:00+0900,8411.T,195.8,196.8,195.6,196.3,196.3,93302700,kabu.com\r\n2018-03-15T15:00:00+0900,8411.T,195.5,195.7,194.3,194.9,194.9,113417600,kabu.com\r\n2018-03-16T15:00:00+0900,8411.T,195.1,195.5,194.7,194.7,194.7,107815400,kabu.com\r\n2018-03-19T15:00:00+0900,8411.T,194.5,194.5,192.5,193.0,193.0,137255900,kabu.com\r\n2018-03-20T15:00:00+0900,8411.T,192.5,193.9,192.1,193.5,193.5,103270200,kabu.com\r\n2018-03-22T15:00:00+0900,8411.T,192.4,193.4,192.0,193.3,193.3,133823100,kabu.com\r\n2018-03-23T15:00:00+0900,8411.T,190.6,191.6,190.2,191.2,191.2,199884200,kabu.com\r\n2018-03-26T15:00:00+0900,8411.T,190.0,191.0,189.2,191.0,191.0,217140200,kabu.com\r\n2018-03-27T15:00:00+0900,8411.T,193.0,195.0,192.8,195.0,195.0,187622200,kabu.com\r\n2018-03-28T15:00:00+0900,8411.T,189.1,192.7,189.1,192.6,192.6,142174400,kabu.com\r\n2018-03-29T15:00:00+0900,8411.T,193.7,194.0,190.0,191.5,191.5,106165900,kabu.com\r\n2018-03-30T15:00:00+0900,8411.T,192.2,193.1,190.9,191.4,191.4,94836300,kabu.com\r\n2018-04-02T15:00:00+0900,8411.T,191.4,192.5,190.3,190.3,190.3,71542000,kabu.com\r\n2018-04-03T15:00:00+0900,8411.T,189.0,190.0,188.1,189.9,189.9,104653400,kabu.com\r\n'

改行コードCRLFのCRを消してLFにしてからpandas.read_csvで読む

In [10]:
sio = StringIO( csv.replace('\r', '') )
ohlcvC = pd.read_csv( sio ).loc[:,['at','code','open','high','low','close','volume']].set_index('at')
ohlcvC
Out[10]:

JSONで取得してみる

通常はこれを使う

In [11]:
json = useragent.get_history_web( code )
json
Out[11]:

情報の整理

In [12]:
ohlcvW = json.loc[:,['code','open','high','low','close','volume']]
ohlcvW
Out[12]:

ZeroMQ – PUB / SUB非同期通信路を試す

リクエストをサーバーに送る

In [13]:
useragent.req_history_zmq( code )
Out[13]:
<Response [204]>

HTTPで返す情報が無いので204 NoContentが返ってくる。

In [14]:
topic, df = useragent.read_pub_zmq()

ZeroMQのPUB / SUB通信でトピックとpandas DataFrameを得る。
このメソッドは通信終了までブロックする。

In [15]:
topic
Out[15]:
b''
In [16]:
df
Out[16]:

同じく情報の整理

In [17]:
ohlcvZ = df.loc[:,['at','code','open','high','low','close','volume']].set_index('at')
ohlcvZ
Out[17]:
In [18]:
del useragent

ZeroMQによるメッセージングが働くので、スレッドで並列実行してみる

In [19]:
run_threads = 10
In [20]:
from concurrent.futures import ThreadPoolExecutor, as_completed

def ccjob(number):
    topic = b'' 
    ua = UserAgent( topic )
    ua.subscribe_zmq()
    topic, df = ua.read_pub_zmq()
    df = df.loc[:,['at','code','open','high','low','close','volume']].set_index('at')
    return df

ohlcvThreads = []
with ThreadPoolExecutor() as executor:
    ohlcvThreads = executor.map( ccjob, range(run_threads) )

ここで停止する(ブロッキング関数をよんでいるから)ので
以下のリンクをクリックして出版依頼APIを呼ぶ
http://localhost:8739/publish/?code=8411.T
このページの内容はありません。(204 NoContent)

すべての情報が一致することを確認する

In [21]:
import hashlib

対象リスト

In [22]:
df_under_test  = [ohlcvC, ohlcvW, ohlcvZ] +  list(ohlcvThreads)

DataFrameをmd5チェックサムにする

In [23]:
def checksum( x ):
    return hashlib.md5( x.to_msgpack() ).hexdigest()

dut = [checksum(x) for x in df_under_test]
dut
Out[23]:
['201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13',
 '201a22c41204376c161a5b682216fb13']

先頭要素と全要素の一致を確認する

In [24]:
cond = [x == dut[0] for x in dut ]
cond
Out[24]:
[True, True, True, True, True, True, True, True, True, True, True, True, True]

すべてTrueであること

In [25]:
c = all(cond)
assert c, "!"
c
Out[25]:
True

結論

すべて同じリクエストだから同じ情報が得られて当然でした。

コメントを残す

メールアドレスが公開されることはありません。