Drools Fusionのbrokerサンプルについて

平田です。

先日のBRMS 5.3のハンズオンではComplex Event Processing (CEP: 複合イベント処理) の説明や演習が無かったので、調査がてらDrools Fusionのbrokerサンプルを眺めてみました。

Drools Fusion

Drools Fusionは、DroolsエンジンにおけるCEPモジュールです。

初めてDroolsを調べたとき、「Complex Event Processing」という字面だけ見て「なぜにイベントがファーストクラス扱いなの?」と思って掘り下げなかったのですが、CEPという技術用語らしいです。検索すると色々と出てきますが、不正検知やリソース最適化などに使われているようです。

brokerサンプル概要

Drools Fusionのサンプルは、githubDrools Downloadsページの「Drools and jBPM integration」から取得できます。サンプルプログラムが三つ入っており、brokerがDrools Fusionのサンプルになります。

brokerは、いわゆるアルゴリズムトレードのプログラムです。runExamples.batを実行すると、株価ボードが起動し、値動きに応じて自動的に株式売買を行い、そのログが画面右側に出力されます。

brokerプログラム

brokerプログラム

brokerは以下の様な構成要素から成ります。

  • org.drools.examples.broker.BrokerExample
    • mainメソッドを持つブートストラップクラス
  • org.drools.examples.broker.Broker
    • このプログラムのコントローラ
    • receiveメソッドが値動きの度に呼ばれます。
  • org.drools.examples.broker.ui パッケージ
    • Swing GUIコンポーネント
  • org.drools.examples.broker.events, misc, model パッケージ
    • 値動きのイベントやルール用のファクト、ユーティリティなど
  • src/main/resources
    • stocktickstream.dat
      • 値動きのテストデータ
      • 各列はそれぞれ「開始からのミリ秒」「銘柄」「株価」
    • broker.drl, notify.drl, position.drl
      • ルール定義
    • position.rf
      • 値動きを検知した際のルールフロー

Javaプログラム部分の動き

brokerプログラムは、stocktickstream.datが発生させる値動きイベントに反応して動きます。EventFeederやStockTickPersisterが、この辺りの処理を実装しています。

値動きの度にBrokerコントローラのreceiveが呼ばれ、ルールが起動されます。

    public void receive(Event<?> event) {
        try {
            StockTick tick = ((Event<StockTick>) event).getObject(); //値動きイベントデータ
            Company company = this.companies.getCompany( tick.getSymbol() );
            this.tickStream.insert( tick );
            this.session.getAgenda().getAgendaGroup( "evaluation" ).setFocus();
            this.session.fireAllRules(); //ルール起動
            window.updateCompany( company.getSymbol() );
            window.updateTick( tick );

        } catch ( Exception e ) {
            System.err.println("=============================================================");
            System.err.println("Unexpected exception caught: "+e.getMessage() );
            e.printStackTrace();
        }
    }

値動きイベント処理

値動きイベントに関する処理は、broker.drlに定義されています。”Update stock price”ルールでは、値動きのたびにワーキングメモリ内のファクトデータ(Company: 銘柄、StockTick: 値動き)を更新しています。

# a simple rule to show that it is possible to join
# events from an entry-point (stream) with facts
# present in the working memory
rule "Update stock price"
    agenda-group "evaluation"
    lock-on-active
when
    $cp : Company( $sb : symbol )
    $st : StockTick( symbol == $sb, $pr : price ) from entry-point "StockTick stream"
then
    // This shows an update on working memory facts with data from joined events
    modify( $cp ) { currentPrice = $pr }

    // Although events are considered immutable, a common pattern is to use a class
    // to represent an event and enrich that event instance with data derived from other facts/events.
    // Below we "enrich" the event instance with the percentual change in the price,
    // based on the previous price
    modify( $st ) { delta = $cp.delta }
end

また、”sudden drop”で、下げ幅が5%を超えた場合にSuddenDropEventを発生させています。

# this rule shows a trick to get the last available event as well as
# how to call global services from the consequence
rule "sudden drop"
    enabled true
    agenda-group "report"
when
    $st : StockTick( $sb : symbol, $ts : timestamp, $pr : price, $dt : delta < -0.05 ) from entry-point "StockTick stream"
    not( StockTick( symbol == $sb, timestamp > $ts ) from entry-point "StockTick stream" )
then
    services.log( "Drop >5%: "+$sb+" delta: "+ Utils.percent($dt)+" price: $"+$pr );

    # we also want to create an event and forward it into the engine to a predefined entry point
    # that is being listened by other rules
    with( sde = new SuddenDropEvent() ) {
        symbol = $sb,
        percent = $dt,
        timestamp = $ts
    }
    entryPoints["Analysis Events"].insert( sde );
end

値下がりイベント処理

SuddenDropEventが発生すると、position.drlの”Start adjust position process”がマッチし、position.rfで定義したルールフローを開始します。

# here we have an example of a rule that controls a process
rule "Start adjust position process"
when
    $sde : SuddenDropEvent( ) from entry-point "Analysis Events"
then
    variables = [ "symbol" : $sde.symbol ];
    drools.getKnowledgeRuntime().startProcess( "adjust position", variables );
end
ルールフロー

ルールフロー

position.rfでは、まず”Evaluate Position”ノードでposition.drlの”If the drop is…”のルールを実行し、下げ幅に応じたアクション(買い、売り、スルー)を決定します。

# below we have rules controlled by the process,
# i.e., the process will fire these rules when necessary
# to re-evaluate the position
rule "If the drop is between 6% and 8%, buy more shares"
    ruleflow-group "evaluate position"
when
    $sde : SuddenDropEvent( percent >= -0.08 && < -0.06 ) from entry-point "Analysis Events"
then
    with( pa = new PortfolioAction() ) {
        action = Action.BUY,
        symbol = $sde.symbol,
        quant = 100
    }
    insert( pa );
end    

rule "If the drop is on more than 8%, sell shares"
    ruleflow-group "evaluate position"
when
    $sde : SuddenDropEvent( percent < -0.08 ) from entry-point "Analysis Events"
then
    with( pa = new PortfolioAction() ) {
        action = Action.SELL,
        symbol = $sde.symbol,
        quant = 100
    }
    insert( pa );
end    

rule "If the drop is between 5% and 6%, do nothing"
    ruleflow-group "evaluate position"
when
    $sde : SuddenDropEvent( percent >= -0.06 ) from entry-point "Analysis Events"
then
    with( pa = new PortfolioAction() ) {
        action = Action.NOACTION,
        symbol = $sde.symbol
    }
    insert( pa );
end

ワーキングメモリに投入されたPortfolioActionをもとにルールフローの分岐を遷移し、アクションを実行(ログ出力のみ)します。

アクション実行後、Notifyノードからnotify.drlに定義されたファクトの削除処理が呼ばれます。

rule "Portfolio action no longer needed"
        dialect "mvel"
        ruleflow-group "notify"
    when
        $pa : PortfolioAction()
    then
        retract( $pa );
end

まとめ

「全部Javaで書けば良いじゃん」と思うかもしれませんが、ルール部分は差し替え可能であり、プログラムの修正無しに下げ幅の閾値を調整したり、アクションを追加できるのがポイントになります。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です