てけとーぶろぐ。

ソフトウェアの開発と、お絵かきと、雑記と。

Pythonでビットコインの自動取引をシミュレーションする

前回までで、実際のビットコインの価格データを収集したり、その価格データをグラフ化したりしました。

今回はゴールデンクロスを使った簡単な取引のアルゴリズムをつくり取引をシミュレーションしてみます。

シミュレーションの概要

シミュレーションはシミュレーターという仮想の取引所の中である期間仮想のトレーダーに取引させて行うこととします。
そして開始時と終了時でトレーダーがもつ資産がどう変わったか等を見てアルゴリズムがうまく働いているかを確認します。

今回行うシミュレーションでは実装を簡単にするために実際の取引からいくらか単純化します。

シミュレーションの概要を以下に示します。

  • トレーダーはファイルに記録された価格データのタイムスタンプのタイミングで売買の判断と売買を行う。例えば以前作成した ticker_20200514.csv であれば 2020-05-06T10:15:39Z から 2020-05-13T06:00:00Z までの間で約3秒おきに行う。
  • シミュレーション開始時のトレーダーの資産は200万円。
  • シミュレーション終了時にトレーダーがビットコインを持っていたらbidの価格で円に換算して終了時の資産を計算する。
  • トレーダーのビットコインの売買単位は1単位のみ。
  • トレーダーが同時に持てるビットコインは1単位のみ。
  • 売買は即座にそのときの価格で約定することとする。

設計

以下のようなクラス構成を考えました。

Simulatorクラス

  • 仮想の取引所。この中でTraderが特定の時間間隔で行動する。
  • Traderに現在のビットコインの価格を提示する。
  • シミュレーションの結果を表示する。
  • トレーダーからの売買注文の受付はより詳細なシミュレーションには必要だが今回は省略。

Traderクラス

PriceDataManagerクラス

  • 価格データを蓄積する。
  • 蓄積した価格データのうち直近の指定秒間の価格データを使って移動平均値を計算する。


クラス間の所有関係を図にすると以下のようになります。

f:id:kurimayoshida:20200627080004p:plain
クラス間の関係

メインの処理の作成

Simulatorクラスのインスタンスのsimulate()を実行するだけです。
simulate()がシミュレーションを行って結果を表示してくれるはずです。

from simulator import Simulator


if __name__ == '__main__':

    simulator = Simulator()
    simulator.simulate()

Simulatorクラスの作成

Pythonでビットコインの価格を取得する(2) - てけとーぶろぐ。で作成した価格データのCSVファイルを読み込み、先頭のデータから1行よんではその価格をシミュレーション上の今の価格としてトレーダーを行動させます。
トレーダーは行動する中でSimulatorクラスのget_cur_ticker()メソッドを呼んで価格を取得するのでシミュレーション上の今の価格を取得することになります。

from api import Ticker
import pandas as pd


class Simulator:
    def __init__(self):
        self.cur_ticker = None

    def simulate(self):
        from trader import Trader
        self.trader = Trader(self)
        self.trader.yen_amount = 2000000
        self.trader.btc_amount = 0

        print(f'開始 資産: {self.trader.yen_amount}')

        # CSVファイルを読み込む
        df = pd.read_csv('ticker_20200514.csv')
        df['timestamp'] = pd.to_datetime(df['timestamp'])

        # 一行ずつ処理
        for _, row in df.iterrows():
            self.cur_ticker = Ticker(row['ask'], row['bid'], row['high'], row['last'],
                    row['low'], row['symbol'], row['timestamp'], row['volume'])
            self.trader.act()

        # 全行処理が終わったら結果を表示する
        print(f'終了 資産: {self.trader.yen_amount + self.trader.btc_amount * self.cur_ticker.bid_price}')

    def get_cur_ticker(self) -> Ticker:
        return self.cur_ticker

Traderクラスの作成

act()メソッドがシミュレーター内で一定仮想時間ごとに実行されます。
現在の状態(ビットコインを持っているか否か)に応じて売買の判断をします。
具体的には、ビットコインを持っていないときは1分の移動平均線と10分の移動平均線ゴールデンクロスで買いの判断をします。ビットコインを持っているときは買いから5時間経過するか、買いから1%価格が上がるか、0.5%価格が下がるかしたら売りの判断をします。

from enum import Enum, auto
from simulator import Simulator
from price_data_manager import PriceDataManager
import datetime


class TradingStatus(Enum):
    # 買い発注待ち
    STANDING_BY_FOR_BUYING = auto()
    # 売り発注待ち
    STANDING_BY_FOR_SELLING = auto()


class SellingType(Enum):
    DEADLINE = auto()
    PROFIT_TAKING = auto()
    STOP_LOSS = auto()


class Trader:
    SHORT_MA_PERIOD_SEC = 1 * 60
    LONG_MA_PERIOD_SEC = 10 * 60

    MAX_HOLDING_SEC = 5 * 60 * 60
    SELLING_TH_PRICE_RATE = 1.01
    STOP_LOSS_SELLING_TH_PRICE_RATE = 0.995

    def __init__(self, simulator: Simulator) :
        # TODO Simulator ではなくAPIを渡してAPIの通信先が
        # Simulator でも GMOコインのAPI でも同じように扱えるようにする

        self.state = TradingStatus.STANDING_BY_FOR_BUYING
        self.simulator = simulator

        # 資産 ----

        self.yen_amount = 0
        self.btc_amount = 0

        # 取引判断用データ ----

        self.price_data_manager = PriceDataManager()

        # 前回の移動平均
        self.last_long_ma_price = None
        self.last_short_ma_price = None

        self.bought_price = None
        self.bought_datetime = None


    def act(self):
        ticker = self.simulator.get_cur_ticker()
        self.price_data_manager.add_ticker(ticker)

        short_ma_price = self.price_data_manager.get_moving_average_ask_price(self.SHORT_MA_PERIOD_SEC)
        long_ma_price = self.price_data_manager.get_moving_average_ask_price(self.LONG_MA_PERIOD_SEC)

        if self.state == TradingStatus.STANDING_BY_FOR_BUYING:
            # ゴールデンクロスのチェック
            if (self.last_short_ma_price is not None and 
                self.last_short_ma_price < self.last_long_ma_price and 
                short_ma_price > long_ma_price):

                # 買い
                # 簡易的に今の価格で買えたこととする
                self.yen_amount -= ticker.ask_price
                self.btc_amount += 1
                self.state = TradingStatus.STANDING_BY_FOR_SELLING
                self.bought_price = ticker.ask_price
                self.bought_datetime = ticker.timestamp
                print(f'{ticker.timestamp} 買い {ticker.ask_price}')

        elif self.state == TradingStatus.STANDING_BY_FOR_SELLING:
            # 時間経過、指定の割合上昇、指定の割合下降で売る
            selling_type = None
            if ticker.timestamp >= self.bought_datetime + datetime.timedelta(seconds=self.MAX_HOLDING_SEC):
                selling_type = SellingType.DEADLINE
            if ticker.bid_price >= self.bought_price * self.SELLING_TH_PRICE_RATE:
                selling_type = SellingType.PROFIT_TAKING
            if ticker.bid_price <= self.bought_price * self.STOP_LOSS_SELLING_TH_PRICE_RATE:
                selling_type = SellingType.STOP_LOSS

            if selling_type:
                # 売り
                # 簡易的に今の価格で売れたこととする
                self.yen_amount += ticker.bid_price
                self.btc_amount -= 1
                self.state = TradingStatus.STANDING_BY_FOR_BUYING

                if selling_type == SellingType.DEADLINE:
                    action = '売り(期限)'
                elif selling_type == SellingType.PROFIT_TAKING:
                    action = '売り(利確)'
                elif selling_type == SellingType.STOP_LOSS:
                    action = '売り(ロスカット)'
                print(f'{ticker.timestamp} {action} {ticker.bid_price}')

        self.last_short_ma_price = short_ma_price
        self.last_long_ma_price = long_ma_price

PriceDataManagerクラスの作成

add_ticker()メソッドで価格データを蓄積しget_moving_average_ask_price()メソッドやget_moving_average_bid_price()メソッドで蓄積した価格データを使ってaskやbidの価格の移動平均値を求めます。

from api import Ticker
from collections import deque
import datetime

class PriceDataManager:
    MAX_MOVING_AVERAGE_PERIOD_SEC = 10 * 60

    _PRICE_TYPE_ASK = 'ask'
    _PRICE_TYPE_BID = 'bid'

    def __init__(self) :
        self.ticker_queue = deque()

    def add_ticker(self, ticker: Ticker) :
        self.ticker_queue.append(ticker)
        # Tickerが増えすぎないように指定期間より過去のTickerは削除する
        th_datetime = ticker.timestamp - datetime.timedelta(seconds=self.MAX_MOVING_AVERAGE_PERIOD_SEC)
        while self.ticker_queue[0].timestamp < th_datetime:
            self.ticker_queue.popleft()

    def get_moving_average_ask_price(self, period_sec) -> float:
        return self._get_moving_average_price(period_sec, self._PRICE_TYPE_ASK)

    def get_moving_average_bid_price(self, period_sec) -> float:
        return self._get_moving_average_price(period_sec, self._PRICE_TYPE_BID)

    def _get_moving_average_price(self, period_sec, price_type) -> float:
        th_datetime = self.ticker_queue[-1].timestamp - datetime.timedelta(seconds=period_sec)
        num = 0
        sum = 0.0
        for ticker in reversed(self.ticker_queue):
            if th_datetime < ticker.timestamp:
                num += 1
                if price_type == self._PRICE_TYPE_ASK:
                    sum += ticker.ask_price
                elif price_type == self._PRICE_TYPE_BID:
                    sum += ticker.bid_price
        return sum / num

プログラムの実行

プログラムを実行してみます。

> python .\main.py
開始 資産: 2000000
2020-05-06 10:44:33+00:00 買い 981000
2020-05-06 10:49:15+00:00 売り(ロスカット) 978350
2020-05-06 10:53:48+00:00 買い 979990
2020-05-06 13:23:30+00:00 売り(ロスカット) 975000
2020-05-06 13:31:15+00:00 買い 977130
2020-05-06 18:31:15+00:00 売り(期限) 978670
2020-05-06 18:31:33+00:00 買い 980520
2020-05-06 18:41:56+00:00 売り(ロスカット) 978020
2020-05-06 18:48:15+00:00 買い 979990
2020-05-06 23:48:18+00:00 売り(期限) 985000
2020-05-07 00:07:57+00:00 買い 971000
2020-05-07 02:39:57+00:00 売り(利確) 990969
…
2020-05-13 05:44:57+00:00 買い 955187
2020-05-13 05:53:02+00:00 売り(ロスカット) 950501
終了 資産: 1616646

買いと売りを繰り返して…
買いのタイミングは前回のチャートと見比べるとゴールデンクロスのタイミングになっていそうです。
うーん、なんかロスカットがおおいかな…。
結果…200万円が161万円に!

…改良が必要ですね。シミュレーションしてよかった!と前向きにとらえましょう…。

Pythonでビットコインの価格をグラフ化する

前回収集した価格データをグラフ化して、自動売買のアルゴリズムを考えてみましょう。

価格データのグラフ化

前回収集した価格データのCSVファイルが以下です。

ticker_20200514.zip

こちらをPythonプログラムでグラフ化してみます。
グラフの描画にはnumpyパッケージ, matplotlibパッケージを使います。
またCSVファイルの読み込み等にpandasパッケージを使います。

それぞれをインストールします。

> pip install matplotlib
> pip install pandas

matplotlibを使ったグラフの描画については以下のサイトを参考にしました。

note-tech.com
note-tech.com

全体のコードは以下になります。

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


if __name__ == '__main__':

    df = pd.read_csv('ticker_20200514.csv')

    df['timestamp'] = pd.to_datetime(df['timestamp'])

    START_TIMESTAMP = '2020-05-06 10:00'
    END_TIMESTAMP = '2020-05-06 18:00'
    SAMPLING_PERIOD_SEC = 3
    SHORT_MA_PERIOD_SEC = 1 * 60
    LONG_MA_PERIOD_SEC = 10 * 60

    # データの期間を絞る
    df = df[(df['timestamp'] >= START_TIMESTAMP) & (df['timestamp'] < END_TIMESTAMP)]

    # 移動平均の列を作成
    df['askShortMa'] = df['ask'].rolling(SHORT_MA_PERIOD_SEC // SAMPLING_PERIOD_SEC, min_periods=1).mean()
    df['askLongMa'] = df['ask'].rolling(LONG_MA_PERIOD_SEC // SAMPLING_PERIOD_SEC, min_periods=1).mean()

    # グラフ描画
    fig, ax = plt.subplots()
    ax.plot(df['timestamp'], df['ask'], label='ask')
    ax.plot(df['timestamp'], df['askShortMa'], label='ask ma 1 min')
    ax.plot(df['timestamp'], df['askLongMa'], label='ask ma 10 min')
    ax.legend(fontsize=16)
    ax.set_xlabel('timestamp', fontsize=16)
    ax.set_ylabel('¥', fontsize=16)
    ax.set_title('Ticker', fontsize=16)
    plt.show()

上記サイトから幾つか変更点があります。

まず全データを描画すると時間がかかるためデータを絞りました。
以下のコードで指定の期間分に絞ったDataFrameを作成しています。

    # データの期間を絞る
    df = df[(df['timestamp'] >= START_TIMESTAMP) & (df['timestamp'] < END_TIMESTAMP)]

また移動平均の求め方について
上記サイトではnumpyのconvolve()を使っていますが
pandasでできる範囲はpandasでということで以下のサイトを参考にpandasのrolling()で移動平均を求めています。

note.nkmk.me

以下のコードになります。
「min_periods=1」として先頭の方でデータが足りなくても1つ以上データがあればある分だけで平均値を求めるようにしています。

    # 移動平均の列を作成
    df['askShortMa'] = df['ask'].rolling(SHORT_MA_PERIOD_SEC // SAMPLING_PERIOD_SEC, min_periods=1).mean()
    df['askLongMa'] = df['ask'].rolling(LONG_MA_PERIOD_SEC // SAMPLING_PERIOD_SEC, min_periods=1).mean()

実行すると以下のグラフが描画されます。

f:id:kurimayoshida:20200607160330p:plain

自動取引のアルゴリズムを考える

グラフを眺めてみるとどうでしょうか?
移動平均の期間を変えたり、表示する期間を変えたりしてもいいかと思います。
1分の移動平均線が10分の移動平均線を下から突き抜けるとある程度価格が上昇しているように見えます。
いわゆるゴールデンクロスですね。

参考: 「ゴールデンクロス」「デッドクロス」とは? | お金のキャンパス

このゴールデンクロスを買いシグナルとする自動取引のアルゴリズムでいくことにしましょう。
分析と言うには程遠い感じですしゆくゆくは機械学習の学習結果を使った自動取引などにも挑戦したいとは思うのですが
まずは実現しやすいものでどの程度の成績がでるか試してみます。


次回はこのゴールデンクロスを使った売買のアルゴリズムをつくり
売買をシミュレーションしてみます。

Pythonでビットコインの価格を取得する(2)

前回でビットコインの現在の価格を取得してコンソールに出力することができました。

自動取引の判断材料に使ったり、自動取引のシミュレーションをしたりするためには
現在の価格だけでなく、ある程度の期間の価格が必要になります。
これは現在の価格を取得してそれを記録することを一定期間行えばいいわけです。

記録の方法としてはファイルに書き込んでファイルに記録するのが簡単かと思います。
ですがここでは記録した価格データへのアクセスのしやすさを考えてデータベースに記録してみましょう。

データベースにも色々種類がありますが扱いが簡単で記録するデータのフォーマットも柔軟に変えられるMongoDBを使うことにします。

MongoDBのインストール(Windows PC)

Download Center: Community Server | MongoDB
から

  • Version: 4.2.7 (current release)
  • OS: Windows x64
  • Package: ZIP

を選択してDownloadをクリック。
ダウンロードした「mongodb-win32-x86_64-2012plus-4.2.7.zip」を
適当なディレクトリーに展開します。
(例: C:\MongoDB\mongodb-win32-x86_64-2012plus-4.2.6)

mongodの起動と終了

MongoDBデーモンであるmongodを起動してみます。
DBへデータを読み書きする場合は
あらかじめこのmongodを起動しておく必要があります。

コマンドプロンプトを起動して
カレントディレクトリーをMongoDBインストール先のbinディレクトリーにします。

> cd C:\MongoDB\mongodb-win32-x86_64-2012plus-4.2.7\bin

binディレクトリー内のmongod.exeを実行します。
実行の際にはデータディレクトリーの指定ができます。
ここでは「mongodb-win32-x86_64-2012plus-4.2.7」内の「data」ディレクトリーをデータディレクトリーとします。
データディレクトリーとして指定するディレクトリーはあらかじめ作成しておく必要があるので作成してから実行します。

> mongod --dbpath ../data

これでmongodが起動しました。

終了する場合は起動したコマンドプロンプト上でCtrl+Cキーを押します。

pymongoのインストール

PythonからMongoDBにアクセスするために
pymongoパッケージをインストールします。

Raspbianで簡単にインストールできるMongoDBのバージョンが2.4で
MongoDB 2.4 に対応した一番新しいpymongoのバージョンは3.5.1のようです。

PCで動かしていたプログラムがそのままRaspberry Pi(Raspbian OS)でも使えるよう
pymongoのバージョンを合わせるようにします。
つまりpymongo 3.5.1をバージョン指定してインストールします。

> pip install pymongo==3.5.1
MongoDBへの記録

ビットコインの現在の価格を取得してMongoDBに書き込むプログラムを作成します。
前回作成したビットコインの価格をコンソールに出力するプログラムをベースに
コンソールに出力する部分をMongoDBへの書き込みに書き換えて作成します。

前回作成したプログラム「main_print.py」を「main_mongo.py」という名前でコピーします。
主にこれのupdate()関数を書き換えていきます。
「main_mongo.py」の全体は以下のコードになります。

from api import GmoCoinApi
from apscheduler.schedulers.background import BackgroundScheduler
from time import sleep
import json
from pymongo import MongoClient


gmo_coin_api = GmoCoinApi('https://api.coin.z.com')

client = MongoClient('mongodb://localhost:27017/')
db = client['coin']


def update():
    global is_exception_raised
    try:
        ticker = gmo_coin_api.get_ticker('BTC')
        print(ticker)
        post = {
            'ask': ticker.ask_price,
            'bid': ticker.bid_price,
            'high': ticker.high_price,
            'last': ticker.last_price,
            'low': ticker.low_price,
            'symbol': ticker.symbol,
            'timestamp': ticker.timestamp,
            'volume': ticker.volume,
        }
        db['ticker'].insert_one(post)
    except Exception as exception:
        print(exception)
        is_exception_raised = True


if __name__ == '__main__':
    
    scheduler = BackgroundScheduler()
    scheduler.add_job(update, trigger='interval', seconds=3, max_instances=10)
    scheduler.start()

    is_exception_raised = False

    try:
        while True:          
            if is_exception_raised:
                break

            sleep(0.001)

    except KeyboardInterrupt:
        pass

    scheduler.shutdown()

以下のコードでMongoDBにアクセスするためのMongoClientクラスのインスタンスを作成しています。

client = MongoClient('mongodb://localhost:27017/')

引数は接続先のホスト名です。
このプログラムを実行しているのと同じマシン上で
mongodを起動するので「localhost」になります。

続く以下のコードで指定の名前のデータベースを表す
Databaseクラスのインスタンスを取得します。

db = client['coin']

「coin」という名前のデータベースを使うことにします。
データベースはあらかじめ作成していなくても
存在しない場合は作成されるので問題ありません。

データベースへのデータの書き込みはDatabaseクラスのインスタンスを使って行います。
以下のコードで「post」を「ticker」というコレクションに書き込んでいます。

        db['ticker'].insert_one(post)

DB内のデータの構成を簡単に説明すると
coinデータベース内に、tickerコレクションがあり、tickerコレクション内に
「_id」「ask」「bid」「high」「last」「low」「symbol」「timestamp」「volume」
といったフィールドを持つ価格データのドキュメント(オブジェクト)が複数格納されているという構成になります。
「_id」フィールドは自動的に作られる、オブジェクトのIDを持つフィールドです。

実行してみましょう。
実行にあたってはmongodを起動しておくことを忘れないでください。

mongo shellでデータの確認

プログラムの実行の結果DBにデータが書き込まれたか確認してみましょう。

Pythonプログラムからpymongoを使ってDBのデータを取得して確認してもいいのですが
簡単な確認であればmongo shellを使うのが楽です。

まずmongodを起動しておきます。

新しいコマンドプロンプトを起動しmongo shellを起動します。

> mongo

起動するとMongoDBに対するコマンドを受け付ける状態となります。
ここにデータ取得のコマンド等を入力してデータを確認したりします。

mongo shellを終了する場合はCtrl+Cキーを押します。

データの確認のために必要なコマンドを紹介します。

データベース一覧の表示

> show dbs

使用するデータベースの指定
データベースの削除や、データベース内のコレクションからドキュメントを検索したりするためにはあらかじめこのコマンドで使用するデータベースを指定しておく必要があります。

> use coin

※「coin」データベースの場合

データベースの削除

> use coin
> db.dropDatabase() 

※「coin」データベースの場合

データベース内のコレクション一覧の表示

> use coin
> show collections

※「coin」データベースの場合

データベース内のコレクションの削除

> use coin
> db.ticker.drop()

※「coin」データベース、「ticker」コレクションの場合

コレクション内のドキュメント一覧の表示

> use coin
> db.ticker.find()

※「coin」データベース、「ticker」コレクションの場合

コレクション内の最新X件のドキュメントの表示

> use coin
> db.ticker.find().sort({_id: -1}).limit(10);

※「coin」データベース、「ticker」コレクション、最新10件の場合

参照: https://docs.mongodb.com/manual/mongo/

さて、DBにデータが書き込まれたか確認できたでしょうか?

MongoDBのインストール(Raspberry Pi)

作成したプログラムをRaspberry Piでも動かしてみましょう。
そのためにはRaspberry PiにもMongoDBをインストールする必要があります。

Raspberry Piの場合はコマンド1つでMongoDBをインストールできます。

$ sudo apt-get install mongodb-server

またRaspberry Piの場合はデーモンとしてインストールされるので
インストールしたら以降Raspberry Piを起動すると自動的にWindowsでいうところの「mongod」が起動した状態となり「mongod」起動の操作は不要です。
その他はPCと特に変わりません。

無事Raspberry Piで実行できたらしばらく起動しっぱなしにして
価格データを収集してみましょう。

起動しっぱなしにする際は
Pythonで株価アラートをつくる(4) - てけとーぶろぐ。
で紹介したバックグラウンドでの実行を使ってください。

また長時間起動しっぱなしにしてみると
例えば取引所のシステムメンテナンスが入って
エラーが起きて停止してしまう
といったことも起こることが分かるかと思います。

エラーが起きて停止したことを知るために
Pythonで株価アラートをつくる(1) - てけとーぶろぐ。
でつくったメール通知も組み込むと簡単に停止を知ることができます。

CSVファイルへのエクスポート

価格データの書き込みや検索を行わず
単にある期間の価格データを読み込みたいだけといった用途には
価格データはDBに入っているよりもCSVファイルなどになっていたほうが扱いやすいということもありますね。
例えば価格の推移のグラフを描画するプログラムで価格データを使う場合などです。

MongoDBにはDB内のデータをCSVファイル等にエクスポートするツールが付属しています。

例としてcoinデータベースのtickerコレクションの各フィールドをticker.csvとしてMongoDBのインストール先のディレクトリー(binディレクトリーの親ディレクトリー)に出力してみましょう。

> cd C:\MongoDB\mongodb-win32-x86_64-2012plus-4.2.7\bin
> mongoexport --db coin --collection ticker --type=csv --fields _id,ask,bid,high,last,low,symbol,timestamp,volume --out ../ticker.csv

Raspberry Piの場合はMongoDB 2.4でオプションが少し異なり「--type=csv」ではなく「--csv」になります。

$ mongoexport --db coin --collection ticker --csv --fields _id,ask,bid,high,last,low,symbol,timestamp,volume --out ticker.csv

サンプルとして何日間か分のビットコインの価格をエクスポートしたCSVを以下にアップロードしました。
ticker_20200514.zip

次回は収集した価格データをグラフ化して分析し、自動売買のアルゴリズムを考えてみたいと思います。

Pythonでビットコインの価格を取得する(1)

Raspberry Pi + Pythonビットコイン自動取引」と銘打って書きはじめ
要素技術を身につけるための株価アラートの作成を終え
いよいよビットコインを扱うのですがそれにあたりまえがきを少し。

やりたいことは自動売買

Raspberry PiPythonビットコインも実は結果的にそれらを選ぶことになったもので
やりたいことは自作プログラムでの自動売買なんです。

プログラミングをしたことがあって、何らか投資をしたことがあるという人だったら
誰もが一度は夢見ることではないでしょうか?

自分は10年以上前からできたらいいなと思っていました。
それが最近になってようやく敷居をさげてくれるものが出てきていよいよ実現できるのではないかと思っています。

あたらめてコンセプト

Raspberry Pi + Pythonビットコイン自動取引」のコンセプトをあらためて載せておきます。

  • 高実現性

実現しなくては意味がありません。なんとかして実現します。実現させるために方法を選びます。

  • 低コスト

趣味で、あわよくば利益をというものなのに、コストがかかりすぎても仕方ありませんので…。

この2つをコンセプトとします。

なぜRaspberry Pi?なぜPython? なぜビットコイン

なぜRaspberry Piなのかについては前回まででお分かりいただけたかと思います。
なぜPythonなのかは実装のしやすさからです。
そしてなぜビットコインなのか。
ビットコインはその生まれからか、新興の投資対象であるからか、国内の取引所に自作プログラムから実行可能なAPIを用意しているところがいくつかあります。そこが理由です。
例えば国内株式についてはそのような仕組みを用意している証券会社は見当たりませんでした。
これがないと価格取得や発注のプログラムから用意しないといけないですし、その運用・保守もしなければいけなくなります。
そしてそれが今まで自分が自動取引を断念してきた主な理由でした。

要は自作プログラムでの自動売買を低コストで実現するためのRaspberry PiでありPythonでありビットコインです。

取引所を選ぶ

早速取引所を選びます。

  • 自作プログラムから実行可能なAPI
  • 国内の取引所
  • 安定性が期待できる有名所
  • 安い手数料

あたりからGMOコインを使うことにします。

coin.z.com

そうときまったら口座開設しましょう。

GMOコインのAPIを使う

早速GMOコインのAPIを使ってみましょう。
GMOコインには親切なAPIドキュメントがあり
Pythonでのサンプルコードも用意されています。

APIドキュメント| GMOコイン


GMOコインのAPIにはPublic APIとPrivate APIがあり
Public APIであれば口座開設せずとも実行することができます。
逆にPrivate APIは口座開設を行ってから得られるAPIキーとAPIシークレットがなければ実行することはできません。

幸いビットコインの価格を得るAPI「GET /public/v1/orderbooks」はPublic APIなので口座開設せずとも実行することができます。

API「GET /public/v1/orderbooks」のドキュメントは以下です。
APIドキュメント| GMOコイン

ドキュメントにあるサンプルの通りで十分ではありますが
今後を考えてAPIを実行するクラスを作っておきます。
結果もdataclassで返すようにします。
APIを実行し、APIが返してきたJSONに含まれる値をTickerクラスに詰め込んで返しているだけです。

from dataclasses import dataclass
from datetime import datetime
import requests
from datetime_util import DatetimeUtil

@dataclass
class Ticker:
    ask_price: int
    bid_price: int
    high_price: int
    last_price: int
    low_price: int
    symbol: str
    timestamp: datetime
    volume: float

class GmoCoinApi:
    def __init__(self, end_point) :
        self.end_point = end_point
    
    def get_ticker(self, symbol) -> Ticker:
        path = '/public/v1/ticker?symbol=' + symbol
        response = self._send_get_request(self.end_point + path)
        response_json = response.json()

        if response_json['status'] != 0:
            raise RuntimeError('API error.')

        ask_price = int(response_json['data'][0]['ask'])
        bid_price = int(response_json['data'][0]['bid'])

        # highにnullが入ってくることがあったのでその場合はNoneにする        
        response_json_high = response_json['data'][0]['high']
        if response_json_high is None:
            high_price = None
        else:
            high_price = int(response_json_high)

        last_price = int(response_json['data'][0]['last'])

        # lowにnullが入ってくることがあったのでその場合はNoneにする        
        response_json_low = response_json['data'][0]['low']
        if response_json_low is None:
            low_price = None
        else:
            low_price = int(response_json_low)

        symbol = response_json['data'][0]['symbol']
        timestamp = DatetimeUtil.iso_to_datetime(response_json['data'][0]['timestamp'])
        volume = float(response_json['data'][0]['volume'])

        ticker = Ticker(ask_price, bid_price, high_price, last_price, low_price, symbol, timestamp, volume)
        return ticker


    def _send_get_request(self, url, params=None, **kwargs):
        response = requests.get(url, params=params, **kwargs)
        return response


if __name__ == '__main__':

    gmo_coin_api = GmoCoinApi('https://api.coin.z.com')
    ticker = gmo_coin_api.get_ticker('BTC')
    print(ticker)


この中で使っている DatetimeUtil.iso_to_datetime() は
APIが返してくるISO 8601形式の時刻をdatetimeに変換するユーティリティメソッドで以下がソースコードです。

import datetime

class DatetimeUtil(object):

    @staticmethod
    def iso_to_datetime(iso: str) -> datetime:
        if iso is None:
            return None
            
        dt = datetime.datetime.fromisoformat(iso.replace('Z', '+00:00'))
        if (dt.tzinfo is not None) and (dt.tzinfo.utcoffset(dt) is not None):
            return dt
        else:
            return (dt.astimezone())

    @staticmethod
    def datetime_to_iso(dt: datetime) -> str:
        if dt is None:
            return None

        if (dt.tzinfo is not None) and (dt.tzinfo.utcoffset(dt) is not None):
            if dt.tzinfo == datetime.timezone.utc:
                return dt.isoformat().replace('+00:00', 'Z')
            else:
                return (dt.astimezone(datetime.timezone.utc)).isoformat().replace('+00:00', 'Z')
        else:
            return (dt.astimezone(datetime.timezone.utc)).isoformat().replace('+00:00', 'Z')

実行してみましょう。
実行にあたっては「requests」のパッケージが必要なので
未インストールの場合はインストールします。

> pip install requests

価格が取得できましたね。

時々刻々と変化する価格を取得するためには定期実行することになります。
株価アラートと同様にapschedulerを使って3秒おきに取得してみます。

from api import GmoCoinApi
from apscheduler.schedulers.background import BackgroundScheduler
from time import sleep
import json


gmo_coin_api = GmoCoinApi('https://api.coin.z.com')


def update():
    global is_exception_raised
    try:
        ticker = gmo_coin_api.get_ticker('BTC')
        print(ticker)
    except Exception as exception:
        print(exception)
        is_exception_raised = True


if __name__ == '__main__':
    
    scheduler = BackgroundScheduler()
    scheduler.add_job(update, trigger='interval', seconds=3, max_instances=10)
    scheduler.start()

    is_exception_raised = False

    try:
        while True:          
            if is_exception_raised:
                break

            sleep(0.001)

    except KeyboardInterrupt:
        pass

    scheduler.shutdown()

実行してみます。
実行にあたっては「apscheduler」のパッケージが必要なので
未インストールの場合はインストールします。

> pip install apscheduler

こんなふうにしてビットコインの価格を取得して、あとは、蓄積して、分析して、売買アルゴリズムを作っていこうと思います。

次回は取得した価格データを分析のためにデータベースに保存してみましょう。

Pythonで株価アラートをつくる(4)

今回は前回までで作成した株価アラートプログラムをRaspberry Pi上で動かしていきます。

Raspberry Piの用意

Raspberry Piには幾つもモデルがあります。何を選ぶのがいいでしょうか。
最新モデルは Raspberry Pi 4 ですが「Raspberry Pi Raspberry Pi 3 Model B」「Raspberry Pi Raspberry Pi 2 Model B」あたりが価格も落ち着いていておすすめです。

jp.rs-online.com

jp.rs-online.com

性能もそれなりですが今回の用途には問題ないはずです。

Raspberry Pi Zeroは価格も消費電力も魅力的ですが経験上性能が十分ではないので今回の用途にはおすすめできません。

安定稼働のためにイーサネット接続(有線LAN接続)もできたほうがいいのでその点でもRaspberry Pi Zeroは候補から外れます。

アクセサリーの用意

電源
電源はアンペア数が十分なものを選びましょう。以下がおすすめです。

SDカード
容量はひとまず8GBもあれば十分です。あとは、ブランド、値段、読み書きの速度を見て選びます。
読み書きの速度についてはUHSスピードクラス1、SDスピードクラス10で、読み出し20MB/s もあれば十分です。
1,000円もしないと思います。例えば以下がおすすめです。

その他
お使いのルーターイーサネットケーブルの差込口があること、つまりRaspberry Piを有線LAN接続できることをご確認ください。
接続のためのイーサネットケーブルも必要になります。ルーターのすぐそばにRaspberry Piも設置するならば短いものを。
個人的に薄いケーブルが取り回しが良くておすすめです。例えば以下。

ルーターイーサネットケーブルの差込口がなくwifi接続にする場合は「wifi接続にする場合は」を参照してください。

OS(Raspbian)のインストール

Download Raspbian for Raspberry Pi
より「2020-02-13-raspbian-buster-lite.zip」をダウンロードします。
zipファイルを展開してできた「2020-02-13-raspbian-buster-lite.img」をSDカードに書き込みます。

imgファイルのSDカードへの書き込みにはWin32 Disk Imagerを使います。
Win32 Disk Imager download | SourceForge.net
からダウンロード、インストールします。

SDカードをSDカードリーダーにセットし、Win32 Disk Imagerを起動。
Image Fileとして「2020-02-13-raspbian-buster-lite.img」を指定して
Deviceとして書き込み先のSDカードのドライブを指定して「Write」ボタンをクリックします。

SSH接続の許可

Raspberry Piの操作はPCからSSH接続して行います。
Raspberry PiSSH接続ができるように、SSH接続を許可します。

具体的にはimgファイルを書き込んだSDカードのルートディレクトリー内に「ssh」という名前の空のファイルを作成します。

参照: Enable SSH when creating SD card - Raspberry Pi Forums

コマンドプロンプトで行う場合はコマンドプロンプトを開きまずカレントドライブをSDカードのドライブにします。
「D」ドライブであれば以下の通り。

> D:
> cd \

ファイルを確認します。

> dir

「overlays」というディレクトリーなどがあるはずです。
そこで「ssh」という名前の空のファイルを作成します。

> type nul > ssh
有線LAN接続

自宅のルーターRaspberry PiEthernetケーブルで接続します。

(参考)wifiで接続する場合

wifi接続(無線LAN接続)したい場合はRaspberry PiにUSBキーボード、マウス、ディスプレイを接続し、デスクトップPCのようにして設定してしまうのが簡単です。
(Raspberry Pi Raspberry Pi 2 Model B の場合にはUSB無線LANアダプターも必要です。)
その場合OSはGUIで設定するために「Raspbian Buster Lite」ではなく「Raspbian Buster with desktop」を選択します。
Ethernetケーブル接続の代わりに以下のサイトなどを参考にwifiでネットワーク接続します。
【STEP-07】ラズベリーパイを無線LAN(WiFi)でインターネット接続│FABSHOP.JP -デジタルでものづくり! ファブショップ !

一度接続すれば以降起動時に自動接続されるようになります。

Raspberry Piの起動

Raspberry PiにSDカードを挿し、電源ケーブルを接続すれば起動します。
電源スイッチ付きのケーブルの場合は電源スイッチをONにしてください。

SSHログイン

Windows 10 バージョン1803以降であればsshコマンドが使えます。
このsshコマンドを使ってRaspberry PiSSH接続、ログインします。
コマンドプロンプトから以下のコマンドを実行します。

> ssh -l pi raspberrypi.local

「pi」はRaspbian標準のユーザー名、「raspberrypi.local」はRaspbian標準のホスト名です。
「pi@raspberrypi.local's password:」などとユーザー「pi」のパスワードを求められたら「raspberry」と入力します。

ログアウトする場合は

$ exit

シャットダウンするばあいは

$ sudo shutdown -h now

です。

Python 3 のバージョン確認

RaspbianにはPythonがはじめからインストールされています。
ただしpythonコマンドではPython 2が実行されます。
Python 3の実行にはpython3コマンドを使います。
詳細なバージョンを確認してみましょう。

$ python3 --version
Python 3.7.3
pipのインストール

Pythonパッケージのインストールのためにpipをインストールしておきましょう。

$ sudo apt-get update
$ sudo apt-get install python3-pip

pipについてもpip3コマンドを使うことになります。
例えばapschedulerをインストールするのであれば以下のコマンドになります。

$ pip3 install apscheduler
プログラムの配置

コマンドプロンプトからscpコマンドでRaspberry Piにファイルをコピーできます。
カレントディレクトリーにある「stock-price-watcher」ディレクトリー以下を
ホームディレクトリー内にコピーする場合は

> scp -rp stock-price-watcher pi@raspberrypi.local:~/

「-rp」はディレクトリー以下を再帰的にコピーし、コピー元のファイルの更新日時を保ってコピーします。

パッケージのインストール

プログラムの実行にあたって必要なパッケージをインストールします。

$ pip3 install apscheduler
$ pip3 install bs4
$ pip3 install dataclasses_json
環境変数の設定

プログラムの実行にあたって必要な環境変数の設定をします。
例えば環境変数GMAIL_USER」に「hoge@gmail.com」をセットするのであれば以下のコマンドになります。

$ export GMAIL_USER=hoge@gmail.com
Pythonプログラムの実行

実行したいプログラムの*.pyファイルがあるディレクトリーに移動してから
python3コマンドを使ってプログラムを実行します。

$ cd ~/stock-price-watcher
$ python3 main_multiple_code.py

Ctrl + C キーでプログラムを停止します。

Pythonプログラムのバックグラウンド実行

上記の方法で実行した場合ログアウトとすると
プログラムの実行が終了してしまいます。
ログアウトしても終了しないようにするためにはバックグラウンド実行をする必要があります。
具体的には以下のコマンドでバックグラウンド実行します。

$ nohup python3 main_multiple_code.py &

バックグラウンド実行したプログラムを停止する場合は
まず以下のコマンドでプロセスの一覧を表示して
停止したいプロセスのプロセス番号を確認します。

$ ps -fu pi

プロセスのプロセスID(PID)がわかったら
それを使ってkillコマンドでプログラムを停止します。
例えばPIDが17870であれば以下のコマンドで停止します。

$ kill 17870


これでRaspberry Pi上でPythonプログラムを実行することができるようになりました。
次回から本題のビットコインに入っていきたいと思います。

Pythonで株価アラートをつくる(3)

前回まででPythonで株価アラートプログラムをつくりました。

これを実際に使うとなると、どこかでこのプログラムを動かし続ける必要があります。
そのためには何を使うのがいいでしょうか?

この記事は「Raspberry Pi + Pythonビットコイン自動取引」へとつなげる、そこで使う要素技術を身につけるための記事という位置づけです。

そして「Raspberry Pi + Pythonビットコイン自動取引」では

  • 高実現性
  • 低コスト

をコンセプトにしたいと考えています。

プログラムを動かし続けるというとすぐに浮かぶのはクラウドサービスかと思います。
WindowsLinuxの仮想サーバーであれば手元のPCと同じ感覚で使えます。
しかし基本的に有償、従量課金となっています。料金体系の理解、理解した上での利用が必要となります。動かし続けるとなるとサーバーのスペックを絞っても月1500円程度はかかるでしょう。

プログラムを動かすというとPaaSという選択肢もありますが、こちらも動かし続けるとなると有償というものが多く、またストレージへの書き込みに制限があったりとそのプラットフォームに合わせた工夫が別途必要になってきます。

考えてみれば外部サービスは、提供側はそれで儲けようとしているわけですから、それなりに費用はかかりますよね。

そこで内部に目を向けます。
実現性という意味では既に手元のPCでプログラムを動かしているわけで、自分のPCが一番です。
手元のPCを動かし続ければ…?いわゆる自宅サーバーですね。
この場合のネックは電気代です。あと、普段使うPCでとはいかないでしょうからPC代もでしょうか。

電気代は例えばノートPCだとつけっぱなしにすると月1800円とかかかってきますね。

参考:
パソコンの電気代はどれくらい?節約するにはどうしたら良いの?

普通のPCと同じ感じで使えて、安価で、消費電力が少ないPCはないでしょうか…?

あぁ、ありますね、「Raspberry Pi」。

Raspberry Pi であれば数千円で買えて、消費電力もノートPCの1/10とか。

Raspberry Pi + Pythonビットコイン自動取引」の片方のキーワードが出てきました。

ということで自宅でRaspberry Piを使って株価アラートプログラムを動かします。

次回は具体的にRaspberry Piとその他必要なものを選び、セットアップ、プログラムの実行まで行います。

Pythonで株価アラートをつくる(2)

Raspberry Pi + Pythonビットコイン自動取引」を目指して
まずは株価アラート、の2回目。

前回はアラート条件をソースコードに埋め込んでしまっていたので
それを外部のアラート条件定義ファイルで定義できるようにする。
複数銘柄の監視にも対応させる。

アラート条件定義ファイル

アラート条件定義ファイルは、手での編集のしやすさ、プログラムからの扱いやすさ、表現力を考えて以下のようなjsonファイルとする。

{
    "alertConditions": [
        {
            "alertConditionId": "f870215c-944a-11ea-8747-185e0fdcc982",
            "stockCode": "4755",
            "conditionType": "isEqualToOrHigher",
            "values": [
                1000
            ],
            "isEnabled": true
        },
        {
            "alertConditionId": "f870215d-944a-11ea-80ac-185e0fdcc982",
            "stockCode": "6758",
            "conditionType": "isEqualToOrHigher",
            "values": [
                5000
            ],
            "isEnabled": false
        }
    ]
}

各プロパティの説明は以下の通り。
これで複数銘柄のアラート条件を定義できる。
条件の種類としてはとりあえず基本的な「X円以上になったら」「X円以下になったら」の2種類に対応させることにする。

プロパティ 説明
alertConditions アラート条件の配列
alertConditionId アラート条件のID
stockCode 銘柄コード
conditionType アラート条件の種類
"isEqualToOrHigher":X円以上になったら
"isEqualToOrLower":X円以下になったら
values アラート条件に使う値の配列(現状1つの値しか使わない)
isEnabled 有効か否か(このアラート条件について監視を行うか)
アラート条件のクラス化

ファイルのフォーマットが決まったので
まずはファイルを扱うクラスを「alert_condition.py」としてつくる。

dataclass, dataclasses_jsonを使うと
そのクラスのインスタンスjsonにしたり
jsonからそのクラスのインスタンスをつくったり
といった機能を持つクラスが簡単につくれる。

そこでdataclass, dataclasses_jsonを使ってアラート条件定義ファイルのjsonに対応したAlertConditionクラス、AlertConditionListクラスを作成する。

これだけの定義でもう alert_condition_list.to_json() でクラスのインスタンスjsonにしたり、AlertConditionList.from_json() でjsonからクラスのインスタンスをつくったりできてしまう。

from typing import List, Dict
from dataclasses import dataclass
from dataclasses_json import LetterCase, dataclass_json
import uuid
from collections import OrderedDict


class AlertConditionType:
    IS_EQUAL_TO_OR_HIGHER = 'isEqualToOrHigher'
    IS_EQUAL_TO_OR_LOWER = 'isEqualToOrLower'

@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class AlertCondition:
    alert_condition_id: str
    stock_code: str
    condition_type: str
    values: List[int]
    is_enabled: bool

@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class AlertConditionList:
    alert_conditions: List[AlertCondition]


class AlertConditionUtil:
    JSON_FILE_NAME = 'conditions.json'
    
    @classmethod
    def save(cls, alert_condition_list: AlertConditionList):
        with open(cls.JSON_FILE_NAME, 'w', encoding='utf-8') as f:
            f.write(alert_condition_list.to_json(indent=4, ensure_ascii=False))

    @classmethod
    def load(cls) -> AlertConditionList:
        with open(cls.JSON_FILE_NAME, 'r', encoding='utf-8') as f:
            return AlertConditionList.from_json(f.read())
    
    @staticmethod
    def generate_id() -> str:
        return uuid.uuid1()


if __name__ == '__main__':

    ac1 = AlertCondition(AlertConditionUtil.generate_id(), '4755', AlertConditionType.IS_EQUAL_TO_OR_HIGHER, [1000], True)
    ac2 = AlertCondition(AlertConditionUtil.generate_id(), '6758', AlertConditionType.IS_EQUAL_TO_OR_HIGHER, [5000], True)

    acl = AlertConditionList([ac1, ac2])

    AlertConditionUtil.save(acl)
    new_acl = AlertConditionUtil.load()
    print(new_acl.to_json(indent=4, ensure_ascii=False))

あとはjsonファイルへの保存、jsonファイルからの読み込み、アラート条件IDの生成といったユーティリティメソッドを持ったAlertConditionUtilクラスもつくっておく。

実行してみるとアラート条件定義ファイル「conditions.json」が作成される。

アラート条件に従った監視

アラート条件定義ファイルからアラート条件を読み込むことができるようになったので
このアラート条件を使って株価の監視をしてみる。

import os
from apscheduler.schedulers.background import BackgroundScheduler
from mail_sender import MailSender
from stock_price import StockPrice
from time import sleep
from alert_condition import AlertConditionUtil, AlertConditionType
import datetime

GMAIL_USER = os.environ.get("GMAIL_USER")
GMAIL_PASSWORD = os.environ.get("GMAIL_PASSWORD")
TO_ADDRESS = os.environ.get("TO_ADDRESS")

mail_sender = MailSender(GMAIL_USER, GMAIL_PASSWORD, TO_ADDRESS)

is_condition_enabled = True
is_exception_raised = False


def update():
    global is_exception_raised

    acl = AlertConditionUtil.load()

    # 有効なアラート条件の銘柄コードのユニークなリストを取得
    stock_codes = [ac.stock_code for ac in acl.alert_conditions if ac.is_enabled]
    stock_codes = list(set(stock_codes))

    # 株価を取得
    ticker_dict = {}
    try:
        for stock_code in stock_codes:
            ticker = StockPrice.get_ticker(stock_code)
            ticker_dict[stock_code] = ticker
    except Exception as exception:
        mail_sender.send_mail('update() Exception Alert', f'exception:\n{exception}')
        is_exception_raised = True
        return

    # アラート条件の確認
    for alert_condition in acl.alert_conditions:
        if not alert_condition.is_enabled:
            continue

        stock_code = alert_condition.stock_code
        values = alert_condition.values
        ticker = ticker_dict[stock_code]
        if alert_condition.condition_type == AlertConditionType.IS_EQUAL_TO_OR_HIGHER:
            if ticker.price >= values[0]:
                mail_sender.send_mail(f'[株価アラート] 銘柄コード:{stock_code}', 
                                      f'銘柄コード:{stock_code}の株価が{values[0]}円以上になりました。')
                alert_condition.is_enabled = False

        elif alert_condition.condition_type == AlertConditionType.IS_EQUAL_TO_OR_LOWER:
            value = alert_condition.values[0]
            if ticker.price <= value:
                mail_sender.send_mail(f'[株価アラート] 銘柄コード:{stock_code}', 
                                      f'銘柄コード:{stock_code}の株価が{values[0]}円以下になりました。')
                alert_condition.is_enabled = False

    AlertConditionUtil.save(acl)


if __name__ == '__main__':
    scheduler = BackgroundScheduler()

    # 毎分0秒に実行する
    scheduler.add_job(update, 'cron', second=0, max_instances=10)
    scheduler.start()

    try:
        while True:
            if is_exception_raised:
                break

            sleep(0.001)

    except KeyboardInterrupt:
        pass

    scheduler.shutdown()

前回作成した監視と基本は同じ。主な違いは以下の点。

  • アラート条件をアラート条件定義ファイルから読み込んだものを使っていること
  • 複数銘柄対応のためにループしていること

それから一定時間ごとの処理は毎分0秒のタイミングで実行するようにした。
apschedulerだとこの変更がパラメーターの変更だけでできてしまう。

実行すると、プログラムは毎分0秒のタイミングでアラート条件に従った監視を行い、条件を満たすとその旨を知らせるメールを送信する。
一度条件を満たしたアラート条件については無効になる。

だいぶ実用的になってきたのではないだろうか。

しかし実際に使うとなると、プログラムを実行しつづけなくてはならない。
PCをずっと起動しておくの…?
電気代大丈夫…?

ということで、次回はどう動かし続けるかについて書きたい。