内部構成
[注意] この文書は,ソースコードを読む際の理解を助けるために書かれたものです.
概念および構造の説明に重点を置き,網羅的な解説はしていません.
適宜,ソースコードを参照しながら読んでください.
概観
Kinoko システム全体の構造を図 1.1 に示します.Kinoko のシステムは,大きく分けて,ユーザインターフェース部分の KinokoShell と,データ収集システムの機能を実現する KinokoKernel ,およびシステムのスタートアップを行う Loader から構成されます.
ユーザインターフェース部分である KinokoShell は,機能実装部分である KinokoKernel から完全に分離されており,それらの間の通信は TCP/IP 経由の簡単なテキストコマンド (KOAP プロトコル) によってなされます.
KinokoShell は KinokoKernel とライブラリ等をリンクする必要がないため,基本的にどんな言語・ツールキットを使って実装してもよく,これにより多彩な GUI 環境に対応するとともに,KinokoKernel 部分を,プラットフォーム依存から開放します.
KinokoKernel の最上位レベルは,コンポーネントと呼ばれる機能単体の集合です.これらのコンポーネントはその実行環境である KinokoComponents (KCOM) の上で互いに協調して動作し,一つのシステムを構成します.各データ収集システムにおけるコンポーネントの配置・結合はスクリプトによって記述され,KinokoComponents 実行環境により実際に配置・実行されます.KinokoComponents 実行環境は Loader によって起動されます.
KinokoKernel の構造を図 1.2 に示します.図 1.1 中のコンポーネントは,図 1.2 の最上位の Component Implementations 層に対応します.
Component Implememtations 層は,その下層にあるデータ収集システムの各機能の実装層(図1.2中段右)と,コンポーネントの機能を提供する KinokoComponents 層(図 1.2 中段左)を組合せ,各機能をコンポーネントとしてまとめたものです.
KinokoComponents は,透過的な分散オブジェクト環境を実現する KinokoORB を利用して実装されています.KinokoORB は,ネットワークプロセス間の多対多メッセージ交換を行う KinokoNetwork をその下位に含んでいます.
システムの最下位には,汎用の機能を提供するライブラリおよびシステムサービスや外部ライブラリへのインターフェースを提供するライブラリが置かれ,これらは上位の層(KinokoComponents,KinokoORB 層およびDAQの各機能の実装層)により利用されます.ここで提供される機能は以下の通りです.
- KinokoScript
- 特定の文法に依存しない構文解析を行うライブラリ.
- KinokoDrivers
- データ収集系で使用されるデバイス(VME や CAMAC とそのモジュール) へのアクセスをカプセル化するライブラリ.
- KinokoPlatform
- UNIX が提供するシステムサービスへのアクセス(システムコール)をカプセル化するライブラリ.
- KDBC: Kinoko DataBase Connectivity
- 外部 SQL データベースへのインターフェースライブラリ.
- KMLC: Kinoko Math Library Connectivity
- 外部数学関数へのインターフェースライブラリ.
ソースツリーとコーディング規約
ディレクトリ構成
パッケージを展開すると,kinoko の下に以下のディレクトリが作成されます.
- bin
- コンパイル後に実行可能ファイルが格納されます
- include
- コンパイル後にヘッダファイル類が格納されます.
- lib
- コンパイル後にライブラリファイルが格納されます.
- devices
- コンパイル後に KinokoDrivers のドライバファイルが格納されます.
- src
- Kinoko のソースコードがあります.
- drv
- Kinoko に付属のデバイスドライバがあります.以下のサブディレクトリがあります.
- ・camdrv: CAMAC ドライバ
- 現在は,Linux 2.x 上で東陽テクニカの CC-7x00 および豊伸電子の CCP-PCI が利用できます.
- ・vmedrv: VME ドライバ
- 現在は,Linux 2.x で SBS Technololies (Bit3) の Model-617/618/620 が利用できます.
- scripts
- Kinoko が実行時に使用するスクリプト類が格納されています.
- local
- サンプルやチュートリアルが置かれています.かつては測定固有のスクリプト類を置く場所でしたが,これは現在は推奨されていません(Kinoko のバージョンアップで置き換えられてしまうため).
- tmp
- 空です.Kinoko が実行時に作業ディレクトリとして使うことがあります.
src ディレクトリ以下は,次のような構造になっています.
- kernel: C++ のみで実装されたシステムの中核になる部分
- lib-common: kinokoシステム以外にも利用できる部分を分離したライブラリ群
- mush: システムコールへのインターフェースライブラリ (独立)
- room: DAQ デバイスへのインターフェースライブラリ (独立)
- parasol: 汎用構文解析エンジン (独立)
- kame: Ntuple や Histogram などの解析部品ライブラリ (独立)
- kuma: 数学ライブラリ (独立)
- kdbc: SQL データベースインターフェース (独立)
- kmlc: 外部数学ライブラリインターフェース (独立)
- kisc: 拡張スクリプトライブラリ (上記全てに依存)
- korb: 分散オブジェクトシステム (上記全てに依存)
- kcom: コンポーネントシステム (上記全てに依存)
- lib-domain: kinoko の DAQ システムの各機能を実装したライブラリ群
- buffer: データバッファ (KinokoBuffer)
- stream: 抽象データストリーム (KinokoStream)
- format: データデスクリプタ/データフォーマッタ
- readout: データ読みだし系
- builder: イベントビルダ
- analysis: オンラインデータ解析系
- view: オンラインデータビューア系
- storage: ストレージインターフェース
- control: システムコントロール系
- component: 各機能をコンポーネントで包んだもの
- process: ライブラリの実行ポイントを main() で包んだもの
- shell: いろいろな GUI ライブラリを使ったユーザインターフェース部分
- c++: C++ のライブラリによるユーザインターフェース
- gtk: GTK ライブラリによるユーザインターフェース
- tcltk: Tcl/Tk によるユーザインターフェース
- utils: 関連ユーティリティ
資源利用の方針
シグナル
各サブシステムで使われているシグナルは以下のとおりです.
- KinokoORB は,プロセスが通常の手順で終了しなかった場合に,それらを強制的に正常終了させるためにシグナルを使用します.
- KinokoORB および KinokoComponents のメインループは SIGINT,SIGTERM または SIGHUP を受け取ると終了し,プロセスの終了処理に移行します.
インターバルタイマ
短時間ディレイやタイムアウト処理のためいくつかの場所でインターバルタイマーが使われています.
これらは全て,設定・開始・停止の一連の処理がブロック内の小さい範囲で完結しています.
スレッド
今のところ使っていません.ハードウェアインターフェースをはじめ,スレッドセーフでない部分がいくつかあります.
グローバルパラメータ
環境変数
- KINOKO_ROOT
- KCOM_PATH
- KORB_PROJECT_PATH
- KINOKO_XTERM
- KINOKO_RSH
- KINOKO_CONTROL_PORT_BASE
- KINOKO_DATASTREAM_PORT_BASE
- KINOKO_SHELL_PORT_BASE
- KINOKO_WEB_ROOT
IPC キー
固定パラメータ
- 自動割り当てデータソースIDの範囲
- 自動割り当てセクションIDの範囲
- ポーリングループのスリープ時間の初期値
- KinokoORB ループのスリープ時間
- KinokoComponents ループのスリープ時間
- Buffer 通信ループのスリープ時間
KinokoComponents
システムを,再利用可能な機能単体に分割し,それを後から組み立てることによってシステム構築を行うようにすると,システム自体の再利用性が高くなり,また,新たなシステム構築や変更,保守などが極めて容易になります.ただし,これが可能になるためには,これらの機能単体が高い独立性とカスタマイズ性をもち,かつ,互いに容易に結合できて,統合された一つのシステムを構築できなければなりません.独立性が高くても,それらの間の結合が(単なるソケット通信のように)貧弱ではシステム構築に労力がかかりますし,逆に特定の目的に特化した通信では,再利用性や独立性が保てません.
このような,システム部品となりえる,独立で完備な機能単体を,コンポーネントと呼びます.一般に,コンポーネントとは,以下のような特徴を持ったものを指します.
- システム構成要素として,独立かつ完備
- 明確に定義された外部インターフェース宣言をもつ
- コンポーネント間の直接の通信経路を持たない
- コンテナやアセンブリ環境などの稼働環境のもとで,相互に接続可能
- システム内において,その機能をカスタマイズできる
- 永続性をもつこともある
ここで,永続性を持つコンポーネントをエンティティコンポーネント,持たないコンポーネントをセッションコンポーネントと呼ぶことが多いようです(もっと細かい分類ももちろんあります).
Kinoko においてこのコンポーネントの機能を実装するのが,KinokoComponents サブシステムです.
KinokoComponents におけるコンポーネントとは,オンラインシステムの各機能をプロセスレベルで実装したセッションコンポーネントで,以下のような特長をもちます.
- 非同期に相互参照可能なプロパティ
- イベントとスロットによるコンポーネント間同期通信
- コンポーネント内オブジェクトのエクスポートとインポート
- スクリプトによるカスタマイズ
- ネットワーク透過的なフレームワーク
- ファイル,ソケット,端末ウィンドウなどと結合できるプライベート入出力チャネル
KinokoComponents の使用例
コンポーネントの例として,ここでは kinoko/src/kernel/lib-common/kcom/samples にある Fibonacci コンポーネントと StartStop コンポーネントを見てみます.Fibonacci コンポーネントは,外部からスタートイベントを受け取ると,ストップイベントを受け取るまでの間,フィボナッチ数列を順に計算し,プライベート出力チャネルに出力します.また,StartStop コンポーネントは,プライベート入出力チャネルからのユーザのコマンドにより,スタートやストップなどのイベントを発行します.
KinokoComponents のコンポーネントはプロセスレベルコンポーネントなので,コンポーネント1つに対応してプロセス1つが存在します.Fibonacci コンポーネントに対応するプロセスの実行ファイル名は Fibonacci-kcom,StartStop コンポーネントは StartStop-kcom です.
コンポーネントプロセスを引数無しで実行すると,そのコンポーネントのインターフェース宣言を表示します.
% ./Fibonacci-kcom
//
// Fibonacci.kidl
//
// sample component process for KinokoComponents
// calculates Fibonacci series
//
// Author: Enomoto Sanshiro
// Date: 27 Mar 2000
// Version: 0.1
component Fibonacci {
accepts setInitial(int first, int second); // set a[0] and a[1] value
accepts start(); // start calculating the Fibonacci series
accepts stop(); // stop calculating the Fibonacci series
accepts clear(); // clear index
accepts quit(); // quit
}
ここで,accepts はイベントを受け取るスロットを宣言しています.
同様にして,StartStop コンポーネントのインターフェース宣言も見てみます.
% ./StartStop-kcom
// コメント部分は省略
component StartStop {
property string user_name;
uses MessageBoard message_board;
emits start();
emits stop();
emits clear();
emits quit();
}
ここで,emitはこのコンポーネントが発行するイベントを,property は他コンポーネントから参照可能なプロパティを,uses はインポートオブジェクトを宣言しています.
もう一つ,同じディレクトリにある MessageBoard コンポーネントを見てみましょう.これは,MessageBoard というオブジェクトをエクスポートしています.このオブジェクトはメッセージの書き込みを受け付け,それを普通のC++のI/Oストリームに書き出します.この出力は MessageBoard コンポーネントにより,プライベート入出力チャネルに結合されています.
% ./MessageBoard-kcom
// コメント部分は省略
component MessageBoard {
provides MessageBoard message_board;
accepts quit();
}
provides でエクスポートオブジェクトを宣言しています.
さて,ここで実際にこれらのコンポーネントを用いて,小さなシステムを構築してみましょう.
ここでしなければならないことは以下の通りです.
- 使用するコンポーネントを宣言する
- コンポーネントを配置し,プライベート入出力チャネルを割り当てる
- エクスポートオブジェクトとインポートオブジェクトを結合させる
- コンポーネントのイベントとスロットを結合させる
- 必要ならプロパティの初期値を設定する
KinokoComponents では,これらをスクリプト (KCOM スクリプト) で行います.
以下は,このスクリプトの例です.
// コンポーネントインターフェース宣言の取り込み
import Fibonacci;
import StartStop;
import MessageBoard;
// コンポーネントの宣言,配置,プライベート入出力チャネルの結合
component StartStop controller("localhost", "port: 20000");
component Fibonacci fibonacci("localhost", "display: localhost:0.0");
component MessageBoard message_board("localhost", "file: messages.log");
// エクスポートオブジェクトとインポートオブジェクトの結合
assign controller.message_board => message_board.message_board;
// 以下,全てイベントとスロットの結合
on startup()
{
fibonacci.setInitial(0, 1);
}
on controller.start()
{
fibonacci.start();
}
on controller.stop()
{
fibonacci.stop();
}
on controller.clear()
{
fibonacci.clear();
}
on .quit()
{
fibonacci.quit();
message_board.quit();
exit;
}
KinokoComponents の構造
図 2.1 に KinokoComponents の構造の概要を示します(実際にはもう少し複雑です.また,クラス名も若干異なります).
KinokoORB と KinokoNetwork
オブジェクト指向の分散並列処理系である Kinoko のインフラストラクチャとして,統合された分散オブジェクト環境を実現するのが,KinokoORB および KinokoNetwork サブシステムです.KinokoORB は,遠隔メソッド呼び出し (RMI: Remote Method Invocation) を実現する ORB (Object Request Broker) を核とし,ネーミングコンテキスト(分散環境下のオブジェクトに対するディレクトリサービス),リモートオブジェクトリファレンス,オブジェクトストリームなどの各機能を実装します.KinokoORB により,ネットワーク上の遠隔オブジェクトを,自プロセス内のローカルオブジェクトと同様に扱えるようになり,透過的な分散処理環境を実現します.
KinokoNetwork は KinokoORB の下位にあって,ネットワークプロセス間の多対多メッセージ交換を実現します.局所的なプロセスを接続するノードと,ノード間の自由な接続から構成される Hub-and-Spoke 構造をしており,動的なノードの追加と,動的な経路探索に基づくメッセージパケット交換を行います.これにより,自由な形での計算機やプロセスの追加・接続が可能になり,柔軟なシステム構築を実現します.
KinokoORB の構造
図 3.1 に KinokoORB の構造の概要を示します.
KinokoNetwork の構造
KinokoNetwork のルーティングストラテジ
KinokoScripts
Kinoko は,コンポーネントの実行時構築をはじめ,データ読みだし手順の記述やオンラインデータディスプレイなど,多くの場所でスクリプトを使用します.スクリプトにより記述される対象はコンポーネントからモジュールやヒストグラムまで様々であり,スクリプト言語自体もインタプリタ的なものから構造記述言語(VHDLのような)的なものまで様々です.そして,それぞれの場に応じて,スクリプトの適切な文法が異なります.それぞれの記述対象に合った文法のスクリプトを使用するため,Kinoko は様々な文法のスクリプトを扱える必要があります.
これを実現するのが,KinokoScripts です.KinokoScripts は解析対象の文法に依存しない構文解析エンジンです.
再帰下降法をはじめとする従来の構文解析手法の多くは,構文解析プログラムの構造によって文法を規定していました.その方法は確立したものではあるものの,文法がプログラムの構造を直接決めるため,そのプログラムは文法の変更に全く適応できませんでした.そのため,yacc などのパーザジェネレータが普及し,ある決まった文法に対し,解析コードをその都度(しかし自動で)生成する,という方法がとられてきました.
Kinoko では,再帰下降法アルゴリズムの若干の改良と,オブジェクト指向の利用(主に Prototype パターンと Factory Method パターンの利用)により,この問題をある程度解消し,再利用可能な構文解析ライブラリを構築しました.このライブラリでは,型の扱いで若干拡張性に欠ける点があるものの,ほとんどの手続き型言語の文法をサポートし,演算子や文などの実行時定義すら可能になっています.
このライブラリは,Parasol という名称で,Kinoko とは独立に配布しています.Parasol の詳細については,Parasol ホームページを参照してください.
KinokoDrivers
KinokoDrivers は,データ収集に使用されるハードウェアデバイスへのアクセスをカプセル化します.KinokoDrivers は,さらにコントローラドライバ(KinokoControllerDrivers) と モジュールドライバ (KinokoModuleDrivers) に分けられ,それぞれ各デバイスへアクセスする機能を実装するとともに,デバイス間の差異を吸収し,統一的なインターフェースを提供します.
コントローラドライバ
現在,コントローラドライバには CAMAC コントローラドライバと VME コントローラドライバがあります.これらのドライバはバスにアクセスする汎用の機能を提供し,この機能はモジュールドライバに利用されます.そのため,モジュールドライバでは,バス規格のみに依存し,使用するコントローラの詳細に依存しないコードを書くことができます.
CAMAC コントローラドライバおよび VME コントローラドライバが提供する機能の一覧を以下に示します(詳細については,kinoko/src/kernel/lib-common/room にある RoomCamacAccess.hh および RoomVmeAccess.hh を参照してください).
CAMAC コントローラドライバが提供する API
VME コントローラドライバが提供する API
モジュールドライバ
モジュールドライバは,コントローラドライバの提供する機能を使用し,各モジュールの機能を使用するためのコードを実装します.例えば,林栄精器の電荷積分型 ADC RPC-022 では,モジュールからのデータ読みだしは,CAMAC アクションの F0 により行います.この場合,このモジュールに対応するモジュールドライバの,データ読みだしインターフェースは,以下の様に実装されます.
int TCamacQADC_Rinei_RPC022::Read(int Address, int &Data) throw(THardwareException)
{
static const int ReadFunction = 0;
int Q, X;
_CamacController->CamacAction(
_StationNumber, Address, ReadFunction, Data, Q, X
);
if (! X) {
throw THardwareException("TCamacQADC_Rinei_RPC022::Read()", "No X-response");
}
return Q;
}
この例のように,ハードウェアにアクセスするための複雑な機能はほとんど全てコントローラドライバが提供するので,モジュールドライバの実装は一般に極めて容易です.
モジュールドライバの実装すべきインターフェースは,インターフェースクラス TModule によって規定されています.このインターフェースクラスは,CAMAC や VME などバスの種類によらず,同一のものを使用します.以下に,モジュールドライバが実装するインターフェースの一覧を示します.
モジュールドライバが実装するインターフェース
各モジュールドライバに対して,これらのインターフェース全てを実装する必要はありません.通常,Read() または BlockRead() などの,ごく一部の機能のみが実装されます.実装していない機能が使用された場合は,実行時に THardwareException が投げられます.
KinokoDrivers の自動組み込み
KinokoPlatform
MUSH ホームページ: http://www.awa.tohoku.ac.jp/~sanshiro/kinoko/mush/
KDBC: Kinoko DataBase Connectivity
KDBC は,SQL データベースシステムへのインターフェースです.
各 SQL データベースに対応した KDBC ドライバと,KDBC マネージャなどのいくつかのユーティリティクラスから構成されています.KDBC ドライバ部分で各データベースシステムの実装の差異を吸収することにより,データベースの実装によらない統一されたインターフェースを提供します.
KDBC のインターフェース
以下は,KDBC の使用例です.ここでは,データベースへの接続を確立した後,標準入力から SQL 文を読み,問い合わせを実行して,結果を表示しています.
#include <iostream>
#include "KdbcManager.hh"
using namespace std;
int main(int argc, char** argv)
{
const char* DatabaseType = "PostgreSQL";
const char* DatabaseName = "mydatabase";
// ドライバの生成とデータベースコネクションの取得
TKdbcDriver* Driver = 0;
TKdbcConnection* Connection = 0;
try {
Driver = TKdbcManager::GetInstance()->CreateDriver(DatabaseType);
Connection = Driver->CreateConnection(DatabaseName);
}
catch (TKdbcException &e) {
// エラー処理
}
string Query;
TKdbcResult* Result = 0;
while (cout << "sql> ", getline(cin, Query)) {
try {
// SQLの実行
Result = Connection->ExecuteSql(Query);
}
catch (TKdbcException &e) {
// エラー
cerr << e << endl;
continue;
}
if (Result->IsQueryResult()) {
// クエリ結果の表示
int NumberOfColumns = Result->NumberOfColumns();
while (Result->Next()) {
for (int Column = 0; Column < NumberOfColumns; Column++) {
cout << Result->GetString(Column) << " ";
}
cout << endl;
}
else {
// update 等の結果
cout << Result->NumberOfAffectedRows() << " rows affected" << endl;
}
delete Result;
}
delete Connection;
delete Driver;
return 0;
}
KDBC ドライバの構造
データバッファ(KinokoBuffer)
データストリーム(KinokoStream)
KinokoStream は,データバッファやソケットなどのデータ転送手段を抽象化し,統一された簡便なインターフェースを提供します.KinokoStream には,入力用の KinokoInputStream と,出力用の KinokoOutputStream の2種類があり,そこからバッファアクセス用,ソケットアクセス用などの各実装クラスが派生されます.
- 入力ストリーム
-
TKinokoInputStream |
|
+HasData(): bool
+NextDataSize(): size_t
+Read(Address: void*, MaxSize: size_t): int
+NextEntry(Address: void*&): int
+Flush(Address: void*): void
+GetEntry(Address: void*&): int
+ReleaseEntry(Address: void*): void
+SetBufferSize(BufferSize: int): void
|
- 出力ストリーム
-
TKinokoOutputStream |
|
+Write(Address: const void*, Size: size_t): int
+NextEntry(Address: void*&, MaxSize: size_t): int
+Flush(Address: void*, Size: size_t): int
+GetEntry(Address: void*&, MaxSize: size_t): int
+ReleaseEntry(Address: void*, Size: size_t): int
+SetBufferSize(BufferSize: int): void
|
|
TKinokoOutputSocketStream |
|
TKinokoOutputBufferStream |
|
|
入力ストリーム,出力ストリームともに,以下の3種類のインターフェースがあります.
- Read()/Write()
- ユーザ側でメモリを確保して,そのポインタを渡す形式です.ファイルストリーム,ソケットストリームに対して効率的に働きます.バッファストリームでは,余分なコピーが生じる可能性があります.
- NextEntry()/Flush()
- 内部に1データブロック分のメモリを確保して,そのポインタを返す方式です.ユーザ側の処理が終ったら,データブロックごとに Flush() を呼んでメモリを開放しなければなりません.ファイルストリーム,ソケットストリームでは余分なコピーが生じる可能性があります.バッファストリームでは,データバッファ上の共有メモリがそのまま使われ,効率的に働きます.
- GetEntry()/ReleaseEntry()
- 内部にバッファを作成し,そのポインタを返す方式です.NextEntry()/Flush() 形式と似ていますが,バッファ内に複数のデータブロックを収容することができます.ユーザ側の処理が終ったら,ReleaseEntry() を呼んでメモリを開放しなければなりません.ただし,ReleaseEntry() の呼出し順が GetEntry() の呼出し順と同じである必要はありません.ファイルストリーム,ソケットストリームではバッファ用に大きなメモリが必要で,余分なコピーが生じる可能性があります.内部バッファの大きさは SetBufferSize() で指定できます.バッファストリームでは,データバッファ上の共有メモリがそのまま使われ,効率的に働きます.
入力ストリームでは,Read() / NextEntry() / GetEntry() は利用可能なデータが到着するまで実行を停止します.出力ストリームでも,Write() / NextEntry() / Flush() / GetEntry() / ReleaseEntry() は,(特にバッファストリームやソケットストリームなどで)書き込み待ちなどになった場合,書き込み可能になるまで実行を停止します.この間にシグナルを受信すると,アドレス,サイズ共に 0 を設定して直ちにリターンします.
入力ストリームでは,さらに以下のメソッドも利用可能です.
- bool HasData(void)
- すぐに読みだせるデータがある場合に true を,無い場合に false を返します.
- int NextDataSize(void)
- 次のデータブロックのサイズを返します.すぐに読みだせるデータが無い場合,データが到着するまで待ちます.データを待っている間にシグナルを受信すると,直ちに終了し 0 を返します.
オンラインデータフォーマット
オンラインデータストリーム中のデータは,パケットに分割され,パケット単位でやりとりされます.以下は,パケットの基本構造です.パケットは,一部の例外を除き,32bit ワードの集合で構成されます.
ここで,DataSourceID は,データ生成元(データソース)が持つユニークな整数です(0x0001以上0xffff以下).このIDにより,データストリームが途中で合流してパケットが混ざっても,データの生成元を判別できます.また,DataSourceID フィールドの上位 16bit は 0 で埋められています.これにより,データのバイトオーダを判別することができます(DataSourceId が 0xffff 以上ならバイトオーダが逆).
パケットは,以下に示すようにいくつかの種類があります.種類ごとに一つの整数が割り当てられ,この値は PacketType フィールドに格納されます.
- データデスクリプタパケット (PacketType = 0x0001)
- このデータソースから流される全てのデータの構造と意味を記述します.
- コントロールコマンドパケット (PacketType = 0x0002)
- ランの開始や終了など,コマンドの伝達のために送られます.コマンドごとにサブタイプがあり,PacketType フィールドの下位 16bit (図のXXXX)に以下の値が格納されます.
- RunBegin: 0x0001
- RunEnd: 0x0002
- RunSuspend: 0x0003
- RunResume: 0x0004
- トレイラパケット (PacketType = 0x0004)
- データの構造を表現するために,データの区切りに挿入されます.区切りの位置ごとにサブタイプがあり,PacketType フィールドの下位 16bit (図のXXXX)に以下の値が格納されます.
- データパケット (PacketType = 0x0008)
- データを格納するパケットです.PacketType フィールドの下位 16bit (図のXXXX)には,データの意味を表す SectionID (後述)が格納されます.
一つのデータ生成元(データソース)から来るデータストリームのパケット構成は,以下のようになります.
ここで,イベントデータブロックは以下のようなパケットの集合です.
データパケット
データパケットは,以下に示すように,データサイズを示すヘッダと,データを格納するデータ領域 (DataArea) から構成されます.データサイズはバイト単位で表され,DataArea 中の有効なデータのサイズを示します(DataArea 自体のサイズと異なることもある).
ここで,DataArea に格納されるのは,データセクションと呼ばれるデータの分割単位です.セクションにはいくつかの型があり,そのセクションデータの集合によってイベントデータが構成されます.一つのイベントデータには,可変任意個のセクションデータを含めることができます.各セクションデータは,パケットヘッダ中の SectionID により識別されます.
以下は,現在実装されているセクションの型の概要です.
- Indexed
- 最も基本的なセクション型で,セクションデータはアドレスとデータのペアであるデータエレメントの集合で表されます.アドレス,データとも,0bit から 32bit までの任意の固定長にすることができます.それぞれのデータエレメントは,先頭からインデクスが振られ,このインデクスで各データエレメントにアクセスすることができます.セクション中のエレメント数は可変です.
アドレスとデータの長さの和が 32bit 以下の場合の DataArea のフォーマットは以下の通りです.
アドレスとデータの長さの和が 32bit より大きい場合の DataArea のフォーマットは以下の通りです.
- Tagged
- 特定の固定した構造を持ったデータを表現するためのセクション型で,テーブルまたは構造体に類似しています.セクションデータはデータのみからなるデータエレメントの固定数の集合で表されます.データはは0bitから32bitまでの任意の固定長にすることができます.データエレメントはフィールドと呼ばれ,各フィールドには名前(タグ)が付けられます.セクションデータ中のデータエレメントは,このタグ名で参照できます.
DataArea には,一つの 32bit ワードに一つのフィールドが記録されます.
- Block
- 他のセクションでは表現できない構造のデータを格納するための型です.データセクションはデータサイズのみを管理し,中身の構造に関しては一切関知しません.
データサイズは任意に決められますが,DataArea のサイズは,ワード境界に揃えられます(125 バイトのデータなら 128 バイト使われる).
- Nested
- 任意のセクションを任意個まとめて格納し,一つのセクションとする型です.
DataArea には,ネストされたセクションがそのまま格納されます.
データデスクリプタ
データデスクリプタパケットは,データストリームの先頭に送られ,いくつかのパラメータと,ストリームに含まれる全データパケットのセクション構造を記述します.フォーマットはテキストで,datasourceキーワードに続き DataSourceID とデスクリプタ本体が記述されます.本体中では,attribute キーワードに続きパラメータ(属性)が宣言され,section キーワードに続きセクション構造が宣言・定義されます.
datasource "データソース名"<DataSourceID> {
attribute 属性名 = "値";
attribute 属性名 = "値";
// 任意回の繰り返し
section "セクション名"<SectionID>: セクション定義
section "セクション名"<SectionID>: セクション定義
// 任意回の繰り返し
};
セクション定義は,セクション型ごとに,以下のようになります.
- Indexed
-
indexed (
address: int-アドレス長bit,
data: int-データ長bit
);
- Tagged
-
tagged {
field "フィールド名": int-データ長bit;
field "フィールド名": int-データ長bit {
値 -> "値の別名";
// 任意回の繰り返し
}
// 任意回の繰り返し
}
- Block
-
block;
- Nested
-
nested {
セクション宣言: セクション定義
// 任意回の繰り返し
}
以下は,データデスクリプタの例です.
datasource "CamacAdcTdc"<7>
{
attribute creator = "KinokoCollector-kcom";
attribute creation_date = "Sat Nov 18 02:16:08 JST 2000";
attribute auxiliary_file = "CamacAdcTdc.kts";
attribute auxiliary_file_fingerprint = "463805253352";
section "Adc"<2>: indexed(address: int-8bit, data: int-24bit);
section "Tdc"<3>: indexed(address: int-8bit, data: int-24bit);
}
datasource "KamLAND_01"<5>
{
attribute creator = "KinokoCollector-kcom";
attribute creation_date = "Sat Nov 18 02:16:08 JST 2000";
attribute auxiliary_file = "KamLAND.kts";
attribute auxiliary_file_fingerprint = "385734805701";
section "ATWD_00"<4>: block;
section "ATWD_01"<5>: block;
section "ATWD_02"<6>: block;
section "ATWD_03"<7>: block;
}
datasource "KamLAND Built Data"<1>
{
attribute creator = "KamEventBuilder-kcom";
attribute creation_date = "Sat Nov 18 06:44:37 JST 2000";
section "Event"<3>: tagged {
field "Trigger": int-32bit;
field "Time": int-32bit;
field "Energy": int-32bit;
}
section "ATWD"<4>: nested {
section "Header"<7>: tagged {
field "ATWD Channel": int-2bit;
field "Time Stamp (H)": int-16bit;
field "Time Stamp (L)": int-32bit;
field "Launch Type": int-5bit {
0 -> "Normal";
1 -> "Pedestal";
2 -> "Other";
}
}
section "HitTime"<8>: indexed(address: int-0bit, data: int-32bit);
section "WaveForm"<9>: indexed(address: int-0bit, data: int-16bit);
}
}
データアクセスインターフェース
Kinoko では,これらのフォーマットのデータブロックにアクセスするための一連のクラスを用意しています.基本となるのは以下のクラスです.
- TKinokoDataDescriptor
- データデスクリプタパケットの生成と解読,およびデータソースオブジェクトのリストを保持・管理します.
- void AddDataSource(TKinokoDataSource* DataSource)
- データソースオブジェクトを内部リストに登録.
- TKinokoDataSource* DataSource(long DataSourceId)
- 内部リストから DataSourceId のデータソースオブジェクトを取得.
- void ReadFrom(istream& is)
- データデスクリプタテキストを is から読み込み,解読する.テキスト中のデータソースエントリからデータソースオブジェクトを生成して内部リストに登録.
- void WriteTo(ostream& os)
- データデスクリプタテキストを os に書き出す.
- TKinokoDataSource
- データデスクリプタパケット中の datasource エントリの生成と解読,およびデータセクションオブジェクトのリストを保持・管理します.
- TKinokoDataSource(const string& DataSourceName, long DataSourceId)
- コンストラクタ.
- const string& DataSourceName(void) const
- データソース名を返す.
- void AddDataSection(TKinokoDataSection* DataSection)
- データセクションを内部リストに登録.
- void AddAttribute(const string& Name, const string& Value)
- データソース属性を内部リストに登録.
- TKinokoDataSection* DataSection(int SectionId)
- 内部リストから SectionIdのデータセクションオブジェクトを取得.
- TKinokoDataSection
- データデスクリプタパケット中の datasection エントリの生成と解読を行います.セクション型に応じて,以下のサブクラスがあります.
- TKinokoIndexedDataSection
- TKinokoTaggedDataSection
- TKinokoBlockDataSection
- TKinokoNestedDataSection
- TKinokoDataSection(TKinokoDataSource* DataSource, const string& SectionName)
- コンストラクタ.
- int SectionType(void)
- セクションタイプを enum 定数で返す.この定数はこのクラスで public に宣言されていて,以下のいずれかの値を取る.
- SectionType_Indexed
- SectionType_Tagged
- SectionType_Block
- SectionType_Nested
- const string& SectionName(void) const
- セクション名を返す.
- string FullSectionName(void) const
- セクションがネストしている場合,最外セクションからの全てのセクション名をコロンでつなげた名前を返す.
- TKinokoIndexedDataSection
- TKinokoDataSection の派生クラスで,Indexed 型セクションに対しデータデスクリプタパケット中の section エントリの生成と解読,およびこのセクション型のデータブロックを読み書きするためのオブジェクト Formatter/Scanner を生成・管理します.
- TKinokoIndexedDataSection(TKinokoDataSource* DataSource, const string& SectionName)
- コンストラクタ.
- void SetWidth(int AddressWidth, int DataWidth)
- アドレスフィールドおよびデータフィールドの幅を設定
- TKinokoIndexedDataSectionFormatter* Formatter(void)
- フォーマッタオブジェクトを(必要なら)生成し,返す.
- TKinokoIndexedDataSectionScanner* Scanner(void)
- スキャナオブジェクトを(必要なら)生成し,返す.
- TKinokoTaggedDataSection
- TKinokoDataSection の派生クラスで,Tagged 型セクションに対しデータデスクリプタパケット中の section エントリの生成と解読,およびこのセクション型のデータブロックを読み書きするためのオブジェクト Formatter/Scanner を生成・管理します.
- TKinokoTaggedDataSection(TKinokoDataSource* DataSource, const string& SectionName)
- コンストラクタ.
- int AddField(const string& FieldName, int ValueWidth, TValueNameTable* ValueNameTable = 0)
- フィールドを追加.
- int FieldIndexOf(const string& FieldName)
- フィールドのインデックスを名前から取得.
- string FieldNameOf(int FieldIndex)
- フィールド名(タグ名)をインデックスから取得.
- int NumberOfFields(void)
- セクションに含まれるフィールド数を返す.
- TKinokoTaggedDataSectionFormatter* Formatter(void)
- フォーマッタオブジェクトを(必要なら)生成し,返す.
- TKinokoTaggedDataSectionScanner* Scanner(void)
- スキャナオブジェクトを(必要なら)生成し,返す.
- TKinokoBlockDataSection
- TKinokoDataSection の派生クラスで,Block 型セクションに対しデータデスクリプタパケット中の section エントリの生成と解読,およびこのセクション型のデータブロックを読み書きするためのオブジェクト Formatter/Scanner を生成・管理します.
- TKinokoBlockDataSection(TKinokoDataSource* DataSource, const string& SectionName)
- コンストラクタ.
- TKinokoBlockDataSectionFormatter* Formatter(void)
- フォーマッタオブジェクトを(必要なら)生成し,返す.
- TKinokoBlockDataSectionScanner* Scanner(void)
- スキャナオブジェクトを(必要なら)生成し,返す.
- TKinokoNestedDataSection
- TKinokoDataSection の派生クラスで,Nested 型セクションに対しデータデスクリプタパケット中の section エントリの生成と解読,およびこのセクション型のデータブロックを読み書きするためのオブジェクト Formatter/Scanner を生成・管理します.
- TKinokoNestedDataSection(TKinokoDataSource* DataSource, const string& SectionName)
- コンストラクタ.
- void AddSection(TKinokoDataSection* DataSection)
- 子セクションを追加.
- TKinokoDataSection* SubSection(int SectionId)
- SectionIdを持つ子セクションを返す.
- TKinokoNestedDataSectionFormatter* Formatter(void)
- フォーマッタオブジェクトを(必要なら)生成し,返す.
- TKinokoNestedDataSectionScanner* Scanner(void)
- スキャナオブジェクトを(必要なら)生成し,返す.
入力ストリームなら,TKinokoDataDescriptorのオブジェクトを一つ生成し,データデスクリプタパケットの内容を引数に ReadFrom() を呼べば,対応するデータソース,データセクションオブジェクトが全て構築されます.あとは,各データパケットに付いている DataSourceID, DataSectionID をもとに対応するオブジェクトを探せば,データブロックの解読に必要な情報を得ることができます.さらに,データセクションオブジェクトの生成するスキャナオブジェクトを使用すれば,データブロック中のデータに簡単にアクセスすることができます.
逆に出力ストリームなら,TKinokoDataDescriptorのオブジェクトを一つ生成し,必要なだけデータソースオブジェクト,データセクションオブジェクトを生成して Add()することにより,デスクリプタを構築します.全て登録したら,TKinokoDataDescriptor::WriteTo() より,データデスクリプタテキストを生成できます.データブロックは,各データセクションが生成するフォーマッタオブジェクトを使用すると,簡単に作成することができます.
以下に,Indexed セクションと Tagged セクションのフォーマッタとスキャナのメソッドを示します.他のセクション型も基本的に同様なので,詳しくはソースコードを参照してください.
- TKinokoIndexedDataSectionFormatter
- int DataSizeFor(int NumberOfElements)
- 引数に渡される NumberOfElements 個のデータエレメントを格納するのに必要なデータパケットのサイズ (DataSourceIdなどの全てのパケットヘッダを含む) を返します.
- int WriteTo(void* Buffer, int Index, int Address, int Data)
- 第一引数にパケットの先頭アドレスを取り,パケット内の Index 番目のデータエレメントに Address と Data を書き込みます.
- TKinokoIndexedDataSectionScanner
- int NumberOfElements(void* Buffer)
- データブロックに含まれるデータエレメント数を返します.
- void ReadFrom(void* Buffer, int Index, int& Address, int& Data)
- 第一引数にデータパケットの先頭アドレスを取り,データパケットの Index 番目のデータエレメントのアドレスとデータをそれぞれ引数 Address と Data に返します.
- TKinokoTaggedDataSectionFormatter
- int DataSize(void)
- 必要なデータパケットのサイズ (DataSourceIdなどの全てのパケットヘッダを含む) を返します.
- int WriteTo(void* Buffer, int FieldIndex, int Value)
- 第一引数にパケットの先頭アドレスを取り,パケット内の FieldIndex 番目のフィールドに Value を書き込みます.フィールドインデックスは TKinokoTaggedDataSection の FieldIndexOf()メソッドで取得できます.また,AddField() の戻り値としても得ることができます.
- TKinokoTaggedDataSectionScanner
- void ReadFrom(void* Buffer, int FieldIndex, int& Value)
- 第一引数にデータパケットの先頭アドレスを取り,データパケットの FieldIndex 番目のフィールドの値を引数 Value に返します.フィールドインデックスは TKinokoTaggedDataSection の FieldIndexOf()メソッドで取得できます.
データストリームコンポーネント
Kinoko のコンポーネント群は,データの読み出し・転送・解析・表示・記録などを行うデータストリームコンポーネントと,データ収集システムをコントロール・監視するシステムコンポーネントに大きく分類されます.データストリームコンポーネントはさらに,データバッファなど,データの受渡しに受動的なコンポーネントと,自ら書き込み要求・読みだし要求をする能動的なコンポーネントに分けられます.能動的なコンポーネントには,データストリームに対する作用の方向に応じて,ソース(湧き出し,Source),シンク(吸い込み,Sink),およびその両方であるパイプ(Pipe)の3つのタイプがあります.
これらの分類されたコンポーネントのは,コンポーネントのタイプに対応したクラスの系統によって実装されます.以下の表は,これらのコンポーネントに対応するクラスの継承関係を表したものです.太字のクラスが具象クラスで,実行時に実際にコンポーネントとしてインスタンス化されるもの,それ以外のクラスがコンポーネントの分類を表現し,共通機能を実装する抽象クラスです.
- KcomComponent
- KinokoSystemComponent
- KinokoControllerCom
- KinokoLoggerCom
- KinokoStreamComponent
- KinokoBufferCom
- KinokoActiveStreamComponent
- KinokoStreamSourceComponent
- KinokoStreamSinkComponent
- KinokoRecorderCom
- KinokoViewerCom
- KinokoStreamPipeComponent
- KinokoAnalyzerCom
- KinokoTransporterCom
データストリームコンポーネントは,ストリームの接続や内部オブジェクトの構築など,データ収集を行う前にさまざまな構築作業を行う必要があります.また,終了時にも,順を追ったシステム解体の作業が必要です.特に,データバッファを除くデータストリームコンポーネント (ActiveStreamComponent) では,データ収集を開始する前に,以下のような多くの手順を必要とします.
- コンポーネントが配置・起動されたら,動作ホストや接続ポートなどのプロパティをレジストリに登録して,他のコンポーネントから参照できるようにする.
- setSource()/setSink() などのイベントを待ち,接続先,接続形態(ソケットやバッファなど)の情報を得る.
- ソケットストリームのサーバなら,ポートを接続待機にする.
- ソケットやバッファなどの接続を確立する.
- 必要ならスクリプトを解読し,内部オブジェクトを構築する.
- 取り扱うデータを記述したデータデスクリプタを処理する(送る/受け取る).
以上を全て終らせると,start() イベントによりデータ収集を開始できる状態になります.
データ収集を終了し,システムを終了する場合も,やはり順々にシステムを解体していく必要があります.
- データ収集を終了する.
- 全てのデータが処理されるのを待つ.
- 内部オブジェクトの終了処理をする.
- データストリームへの接続を解除する.
- コンポーネントフレームワークへの接続を解除する.
- 終了する.
システムにおいて,これらの処理をコンポーネント間で同期をとりながら進めて行かなくてはなりません.kinoko では,これらの手順の進行状況に対応して「状態」定義し,各コンポーネントを有限状態系として実装します.状態間の遷移がコンポーネントフレームワークのベントにより発生するようにし,各状態と状態間遷移に上記の各手順を行うようにすることで,全体が同期をとりながら協調して動作するようにしています.
以下は,ActiveStreamComponent の状態遷移を表した状態図です.ここで,状態遷移を引き起こすイベントは,特に断りがない限り,コンポーネントフレームワークのイベントです.
- ComponentReady
- コンポーネントが起動して,最初に来るステートです.ここでコンポーネントプロパティをレジストリに登録し,他のコンポーネントから参照できるようにします.また,この間に setSource() / setSink() などのイベントを受け,接続先のコンポーネントに関する情報を得ます.ストリームがソケットストリームで,自分がデータの受け側であれば,ポートを開き,接続待機状態にします(listen(2) する).connect() イベントを受け取ると,Connecting ステートに遷移します.
- Connecting
- ソケットストリームでのコネクションの確立,バッファストリームでのバッファへの接続などを行います.接続が確立したら,次の StreamReady ステートに移行します.
- StreamReady
- データストリームが使用可能で,内部オブジェクトは構築されていない状態です.Construct() イベントにより,内部オブジェクトの構築などをおこない SystemReady ステートに移行します.また,quit() イベントによって,システムを終了します.
- SystemReady
- データ収集を開始できるが,実際には行っていない状態です.ソースコンポーネントでは,start() イベントによって Running 状態に移行します.シンクコンポーネントおよびパイプコンポーネントでは,ストリームから RunBegin パケットを受け取ることによって,Running 状態に移行します.どちらのコンポーネントタイプでも,destruct() イベントによって内部オブジェクトを解体し,StreamReady ステートに戻ります.
- DataTaking
- データ収集を行います.ソースコンポーネントでは,stop() イベントによって SystemReady 状態に戻ります.シンクコンポーネントおよびパイプコンポーネントでは,ストリームから RunEnd パケットを受け取ることによって,SystemReady 状態に戻ります.
- ComponentReady ステートで connect() イベントを待ってから Connecting ステートに移行するのは,サーバ側のソケットがすべて listen を完了するのを確認してからクライアントが connect をするようにするためです(setSource() イベントスロットは,listen が完了するまでリターンしません).
- SystemReady ステートから DataTaking ステートへの移行は,データの湧き出しであるソースコンポーネントの場合は start() イベントによって,それ以外のコンポーネントの場合は接続されているデータストリームのいづれかから RunBegin パケットを受け取ることによって引き起こされます.
- DataTaking ステートから SystemReady ステートへの移行は,ソースコンポーネントの場合は stop() イベントによって,それ以外のコンポーネントの場合は RunBegin を受け取った全てのデータソースから RunEnd パケットを受け取ることによって引き起こされます.
- Sink コンポーネントや Pipe コンポーネントにおいて stop() イベントを処理しないのは,ストリーム中に残っている可能性のあるデータを全て処理するためです.
以下,それぞれのクラスについて個別に見ていきます.
- KinokoStreamComponent
- 全てのストリームコンポーネントの元になるクラスです.全てのコンポーネントタイプとコンポーネントのステートを宣言し,共通のインターフェース(Logger オブジェクトのインポートなど)を実装します.
TKinokoStreamComponent
|
# _Logger: TKinokoLoggerProxy*
# _State: TState = {State_Initial, State_ComponentReady, ...}
|
+ BuildDescriptor(Descriptor: TKcomComponentDescriptor&)
+ ProcessEvent(EventId: int, Event: TKcomEvent&)
# ChangeState(State: TState): void
|
以下は,このクラスに対応するコンポーネントのコンポーネント宣言です.
<<component>>
KinokoStreamComponent
|
+ stream_type: string = {"source", "sink", "pipe", "buffer" }
+ state: string = {"Initial", "ComponentReady", "Connecting", ...}
+ host: string
# logger: Logger*
|
|
- KinokoActiveStreamComponent
- KinokoStreamComponent を継承し,ストリームに対して能動的に作用する全てのコンポーネントの元になるクラスです.現在のところ,データバッファを除く全てのデータストリームコンポーネントがこのタイプになります.
このクラスでは,このタイプのコンポーネントの全ての状態と状態遷移を定義し,connect(),disconnect() および quit() イベントを処理します.ただし,SystemReady ステートと DataTaking ステート間の遷移は,メソッド StartDataProcessing() および StopDataProcessing() の呼び出しによってなされます.
このクラスから派生されるソースコンポーネントでは,start() や stop() などのイベントを受けとったとき,これらのメソッドを呼び出すようにします.それ以外のコンポーネントでは,保持している _StreamCommandProcessor によってこれらのメソッドが呼び出されます.派生クラスにおいて,ストリームからコマンドパケットを受け取ったら,必ずこの _StreamCommandProcessor の ProcessCommandPacket() メソッドを呼び出すようにしなければなりません.
さらに,このクラスで派生クラスのために SetSource() および SetSink() メソッドを提供しています.このメソッドをあらかじめ呼び出しておくと,connect() イベントを受け取ったとき,このクラスによってストリームの接続が行われます.
加えて,このクラスでは,コンポーネント状態に応じて呼び出されるいくつかのメソッドを純粋仮想関数として宣言しています.これらは,子クラスで適切に定義されなくてはなりません.
TKinokoActiveStreamComponent
|
# _SourceStreamType: TStreamType = { StreamType_Source, ...}
# _SinkStreamType: TStreamType = { StreamType_Source, ...}
# _InputDataStream: TKinokoInputStream*
# _OutputDataStream: TKinokoOutputStream*
# _StreamCommandProcessor: TKinokoStreamCommandProcessor*
|
+ BuildDescriptor(Descriptor: TKcomComponentDescriptor&)
+ ProcessEvent(EventId: int, Event: TKcomEvent&)
+ DoTransaction(void)
- ProcessConnectEvent(Event: TKcomEvent&): int
- ProcessDisconnectEvent(Event: TKcomEvent&): int
- ProcessQuitEvent(Event: TKcomEvent&): int
- Connect(): void
- DoSystemReadyStateTask(): int
# SetProperties(): void
# Construct(): void
# Destruct(): void
# ProcessData(): int
# OnStart(): void
# OnStop(): void
+ StartDataProcessing(): void
+ StopDataProcessing(): void
# SetSource(SourceComponentName: const string&)
# SetSink(SinkComponentName: const string&)
# AllocateInputPortNumber(): long
# GetDataSourceId(): int
|
以下は,このクラスに対応するコンポーネントのコンポーネント宣言です.親クラスから継承するぶんのインターフェースは含まれていません.
<<component>>
KinokoActiveStreamComponent
|
|
+ connect()
+ disconnect()
+ quit()
|
- TKinokoStreamSourceComponent
- TKinokoActiveStreamComponent を継承し,ソースコンポーネントの機能を追加したクラスです.start() および stop() イベントを処理し,TKinokoActiveStreamComponent の StartDataProcessing() または StopDataProcessing() メソッドを呼び出します.
また,setSink() イベントを処理し,TKinokoActiveStreamComponent の setSink() メソッドを呼び出します.ストリームの確立に必要なプロパティ(ホスト名,ストリームタイプなど)の設定も行います.
TKinokoStreamSourceComponent
|
|
+ BuildDescriptor(Descriptor: TKcomComponentDescriptor&)
+ ProcessEvent(EventId: int, Event: TKcomEvent&)
- ProcessSetSinkEvent(Event: TKcomEvent&): int
- ProcessStartEvent(Event: TKcomEvent&): int
- ProcessStopEvent(Event: TKcomEvent&): int
# SetProperties(): void
|
以下は,このクラスに対応するコンポーネントのコンポーネント宣言です.親クラスから継承するぶんのインターフェースは含まれていません.
<<component>>
KinokoStreamSource
|
|
+ setSink(sink_component: KinokoStreamComponent)
+ start()
+ stop()
|
- TKinokoStreamSinkComponent
- KinokoActiveStreamComponent を継承し,シンクコンポーネントの機能を追加したクラスです.setSource() イベントを処理し,TKinokoActiveStreamComponent の setSource() メソッドを呼び出します.ストリームの確立に必要なプロパティ(ホスト名,ストリームタイプ,ポート番号など)の設定も行います.
TKinokoStreamSinkComponent
|
|
+ BuildDescriptor(Descriptor: TKcomComponentDescriptor&)
+ ProcessEvent(EventId: int, Event: TKcomEvent&)
- ProcessSetSourceEvent(Event: TKcomEvent&): int
# SetProperties(): void
|
以下は,このクラスに対応するコンポーネントのコンポーネント宣言です.親クラスから継承するぶんのインターフェースは含まれていません.
<<component>>
KinokoStreamSink
|
+ port_number: int
|
+ setSource(source_component: KinokoStreamComponent)
|
- TKinokoStreamPipeComponent
- TKinokoActiveStreamComponent を継承し,パイプコンポーネントの機能を追加したクラスです.setSourceSink() イベントを処理し,KinokoActiveStreamComponent の setSource() および setSink() メソッドを呼び出します.ストリームの確立に必要なプロパティ(ホスト名,ストリームタイプ,ポート番号など)の設定も行います.
TKinokoStreamPipeComponent
|
|
+ BuildDescriptor(Descriptor: TKcomComponentDescriptor&)
+ ProcessEvent(EventId: int, Event: TKcomEvent&)
- ProcessSetSourceSinkEvent(Event: TKcomEvent&): int
# SetProperties(): void
|
以下は,このクラスに対応するコンポーネントのコンポーネント宣言です.親クラスから継承するぶんのインターフェースは含まれていません.
<<component>>
KinokoStreamPipe
|
+ port_number: int
|
+ setSourceSink(source_component, sink_component: KinokoStreamComponent)
|
以上のクラスにより,データストリームの確立,状態遷移イベントの処理などは全て行われます.残っているのは,データ収集状態において行う処理の記述と,その準備と片付けです.以下のメソッドが純粋仮想関数として残っています.
- void Construct(void) throw(TKinokoException)
- StreamReady 状態から SystemReady 状態に遷移する時に,内部オブジェクトを構築するために呼ばれます.
- void Destruct(void) throw(TKinokoException)
- SystemReady 状態から StreamReady 状態に遷移する時に,内部オブジェクトを解体するために呼ばれます.
- int ProcessData(void) throw(TKinokoException)
- SystemReady 状態または Running 状態中に,ストリームのデータを処理するために,繰り返し呼ばれます.
さらに,必要なら,以下のメソッドを定義すれば,それぞれの状態遷移時に呼び出されます.
- void OnStart(void) throw(TKinokoException)
- Running 状態に遷移したときに呼び出されます.
- void OnStop(void) throw(TKinokoException)
- Running 状態から他の状態に遷移したときに呼び出されます.
Destruct() が呼ばれた後に,再び Construct() が呼ばれる可能性があることに注意してください.
以下に,KinokoSourceStreamComponent クラスを継承した具象クラスの例として,Collector コンポーネントの実装を示します.ここでは,残っていた純粋仮想関数を全て定義して,さらに独自のイベントスロットを一つ追加しています.
このクラスに対応するコンポーネントのコンポーネント宣言は以下のようになります(親クラスから継承するぶんのインターフェースは含まれていません).
オンラインアナリシスフレームワーク
データストリームコンポーネントのフレームワークと,データストリームおよびストリームデータへのアクセスインターフェースを組み合わせれば,どんなデータストリームコンポーネントでも作成することができます.しかし,Kinoko の提供するオンラインアナリシスフレームワークを用いれば,多くの場合でデータストリームコンポーネントをより簡単に作成することができます.
アナリシスフレームワークの中核となるクラスは,TKinokoDataProcessor です.これは,ストリームパイプ型のコンポーネントで実装される解析コンポーネントの雛型になるものです.また,ストリームソース型コンポーネントに対応する TKinokoDataProducer や,ストリームシンク型コンポーネントに対応する TKinokoDataConsumer もあります.
TKinokoDataProcessor と TKinokoDataProducer は,データストリームへのデータ書き出し処理を支援する TKinokoDataSender から派生します.また,TKinokoDataProcessor と TKinokoDataConsumer は,ストリームからのデータ読み込みを支援する TKinokoDataReceiver から派生します.
TKinokoDataProcessor と TKinokoDataProducer および TKinokoDataConsumer に対し,これらの解析クラスをコンポーネント化するための薄いラッパーとして,それぞれ TKinokoDataProcessorCom, TKinokoDataProduderCom, TKinokoDataConsumerCom のクラスが用意されています.
これらのクラスの関係をまとめたクラス図を以下に示します.
以下,TKinokoDataProcessor を中心に説明します.他の2つのタイプに対しても,以下の説明はほぼそのまま適用できます.
まず,Sender および Receiver が,そこから派生されるクラスのために提供しているオブジェクトから説明します.
TKinokoDataReceiver は,子クラスのために以下の3つのオブジェクトを提供しています.
- TKinokoInputStream* _InputStream
- 入力用データストリーム
- TKinokoDataStreamScanner* _InputStreamScanner
- データストリームから受け取ったデータパケットから情報を取り出すオブジェクト
- TKinokoDataDescriptor* _InputDataDescriptor
- データストリームから来るデータのデータデスクリプタ
また,TKinokoDataReceiver は,子クラスのために以下の3つのオブジェクトを提供しています.
- TKinokoOutputStream* _OutputStream
- 出力用データストリーム
- TKinokoDataStreamFormatter* _OutputStreamFormatter
- データストリームに送り出すパケットに情報を書き出すオブジェクト
- TKinokoDataSource* _OutputDataSource
- データストリーム送り出すデータのデータソース定義オブジェクト
これらのオブジェクトは,Sender/Receiver にて構築され,派生クラスにおいて自由に使うことができます.
データストリームコンポーネントの章で述べたように,データストリームコンポーネントはまず Construct() を呼び出して内部オブジェクトを構築し,ストリームデータを処理する必要がある間に ProcessData() を繰り返し呼び出します.そして最後に Destruct() を呼び出して内部オブジェクトの解体を行います.Construct() が呼ばれたときはすでにデータストリームが構築され,データの受渡しが有効になっていることに注意してください.
TKinokoDataProcessorCom クラスは,これらの呼び出しを保持している _DataProcessor オブジェクトにそのまま伝えます.
TKinkoDataProcessor では,Comstruct() で Sender および Receiver の ConstructInlet()/ConstructOutlet() を呼び出して初期化し,BuildDataSource を呼んで出力用データソースオブジェクトを構築し,Sender の SendDataDescriptorPacket() を呼び出してこのデータソースから生成されるデータデスクリプタをパケットにしてストリームに送り出します.最後に,子クラスでオーバーライドできるメソッド OnConstruct() を呼び出して,子クラス独自の初期化をできるようにします.
Destruct() でも同様に,子クラスでオーバーライドできるメソッド OnDestruct() を呼び出します.
ProcessData() は,実際に解析計算を行うメソッドで,子クラスでオーバーライドされることを想定しています.TKinokoDataProducer では,このメソッドの中身は空です.TKinokDataProcessor および TKinokoDataConsumer では,このメソッドをオーバーライドしなかった場合,TKinokoDataReceiver の ProcessInputStream() を呼び出します.ここでは,ストリームからパケットを読み込んで,それがコマンドパケットまたはデータデスクリプタパケットであった場合は適当に処理を行い,データパケットに対しては OnReceiveDataPacket() を,イベントトレイラパケットに対しては OnReceiveEventTrailer() を呼び出します.さらに,全てのパケットに対して OnReceivePacket() を呼び出します.これらのメソッドも,中身は空で,必要に応じて子クラスでオーバーライドされることを想定しています.
もし,TKinokDataProcessor や TKinokoDataConsumer において,ProcessData() をオーバーライドした場合,データデスクリプタパケットやコマンドパケットを受け取ったら必ず TKinokoDataReceiver の ProcessDataDescriptorPacket() や ProcessCommandPacket() を呼び出してください.その他のパケットに対してこれらのメソッドを呼び出しても構いません.その場合は何も処理は行われず,値 false が返されます.これによって,いちいちパケットタイプを調べる手間が省けます.
以下に,TKinokoDataReceiver で実装されている ProcessInputStream() の内容を示します.自分で ProcessData() をオーバーライドして実装する際の参考にしてください.
int TKinokoDataReceiver::ProcessInputStream(void) throw(TKinokoException)
{
void* Packet;
int PacketSize;
if ((PacketSize = _InputStream->NextEntry(Packet)) == 0) {
return 0;
}
_InputStreamScanner->CorrectByteOrder(Packet, PacketSize);
OnReceivePacket(Packet, PacketSize);
if (ProcessDataPacket(Packet, PacketSize)) {
;
}
else if (ProcessTrailerPacket(Packet, PacketSize)) {
;
}
else if (ProcessCommandPacket(Packet, PacketSize)) {
;
}
else if (ProcessDataDescriptorPacket(Packet, PacketSize)) {
;
}
_InputStream->Flush(Packet);
return 1;
}
コンポーネントが,独自のデータを生成し,ストリームに流す場合,ストリームにはデータパケット以外にも以下のようなパケットを適切な場所で流す必要があります.
- データデスクリプタパケット
- 全てのパケットに先だって送られなければならない.データソースの構造を記述している.
- RunBegin/RunEnd パケット (コマンドパケット)
- それぞれ,ランの開始時と終了時に送られる.全てのデータパケットはこれら2つのパケットの間になければならない.
- イベントトレイラパケット
- イベントの最後に,イベントの区切りとして送られる.
すでに述べたように,データデスクリプタパケットは,Construct() メソッドから呼ばれた SendDataDescriptorPacket() により自動で送られます.RunBegin/RunEnd パケットは,TKinokoDataSender で実装されている SendRunBeginPacket()/SendRunEndPacket() を呼び出すだけでストリームに送られます.イベントトレイラパケットについても同様に,SendEventTrailerPacket() により送られます.
すでに持っているメモリ領域中にあるパケットを送りたい場合,TKinokoDataSender の SendPacket() を呼び出すだけで送ることができます.出力ストリーム上にメモリを確保して,そこにデータを書き出してから送りたい場合は,以下のように直接 TKinokoOutputStream を使うことになります.
void* Packet;
unsigned PacketSize = ... // 必要パケットサイズを計算 //
do {
_OutputStream->NextEntry(Packet, PacketSize);
} while (Packet == 0);
... // パケットへのデータ書き込み //
_OutputStream->Flush(Packet, PacketSize);
このコンポーネントの,実行可能形式(コンポーネントプロセス)へのラッパーは以下のようになります(ソース中の TMyDataProcessor がフレームワークをもとに作成したデータ解析クラスです).
int main(int argc, char** argv)
{
TMushArgumentList ArgumentList(argc, argv);
TKinokoDataProcessor* MyDataProcessor = new TMyDataProcessor();
TKcomComponent* Component = new TKinokoDataProcessorCom(MyDataProcessor);
TKcomProcess* ComProcess = new TKcomProcess(Component);
try {
ComProcess->Start(ArgumentList);
}
catch (TKcomException &e) {
cerr << "ERROR: " << argv[0] << ": " << e << endl;
}
delete ComProcess;
delete Component;
delete MyDataProcessor;
return 0;
}
読み出し系
オンラインデータビューア
ストレージインターフェース
Edited by: Enomoto Sanshiro