rsyslogのTLS(SSL)暗号化 でCollectorに対してFORWARDしましたが、その続編的な内容になります。
rsyslogにはMySQLやHDFSに保存するモジュールなど色々あり、Collectorで集めたログをどう扱うかと悩んだ結果、今回はMySQLでやってみることにしました。
リンク
選択理由
ログ量が少なくて重要度が低いなら、ローカルファイルでlogwatchとか、ログが膨大ならBigDataを間借りしてHDFS+Hiveとか考えたけど、
中規模以下ならMySQLが安定だな~と思った理由がこんな感じです。
RDBMSにとっちゃアタリマエですが、rsyslogであまりやらないことだから書いてみました。
複数CollectorはLVSを挟めば簡単に実現できます。
バージョン
OSはWheezyで、rsyslogは標準パッケージの 5.8.11 です。6.3.3 から設定記法が新しくなって、条件分岐とかちゃんと書けるようになるので、何気に楽しみだったりします(legacyだと無駄な分岐記述になったりするので…)。
MySQLのバージョンはなんでもいいです。
MySQL Server
MySQLのインストール
私はクセでPerconaを使いますが、好きなMySQLを入れてmy.cnf設定まで。何度も書いてるので手順は割愛します。
GRANT
作業内容はこんな感じです。
1 2 3 4 5 6 7 8 9 10 11 |
mysql -u root -p mysql mysql> GRANT INSERT, SELECT ON Syslog.* TO rsyslog@"10.20.30.%" IDENTIFIED BY 'password'; GRANT INSERT, SELECT, LOCK TABLES, SHOW VIEW ON Syslog.* TO rsyslog@localhost IDENTIFIED BY 'password'; GRANT RELOAD, SUPER,REPLICATION CLIENT ON *.* TO rsyslog@localhost IDENTIFIED BY 'password'; GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO repl@"10.20.30.%" IDENTIFIED BY 'password'; DELETE FROM user WHERE Password = ''; FLUSH PRIVILEGES; |
rsyslog用DB作成
デフォのDB名がSyslogなのでそのまま使います。SOURCEで作るテーブルはデフォ用テーブルですが、独自テーブルにする場合は不要です。
1 2 3 4 |
mysql> CREATE DATABASE Syslog; use Syslog; SOURCE /usr/share/dbconfig-common/data/rsyslog-mysql/install/mysql |
独自テーブル作成
デフォ用テーブルはかなり余計なカラムがあるので、お好みで作成します。月単位で削除しやすいようにパーティションも切っておきます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
mysql> # テーブル作成 CREATE TABLE `RemoteEvents` ( `ID` int(10) unsigned NOT NULL AUTO_INCREMENT, `ReportedAt` datetime DEFAULT NULL COMMENT 'timestamp from the message', `ReceivedAt` datetime DEFAULT NULL COMMENT 'timestamp when the message was RECEIVED', `HostName` varchar(80) DEFAULT NULL, `IPAddress` varchar(60) DEFAULT NULL, `Facility` smallint(6) DEFAULT NULL, `FacilityText` varchar(16) DEFAULT NULL, `Priority` smallint(6) DEFAULT NULL, `PriorityText` varchar(16) DEFAULT NULL, `Program` varchar(60) DEFAULT NULL, `Message` text, PRIMARY KEY (`ID`, `ReportedAt`), KEY (`ReportedAt`) ); # パーティションを切っておく ALTER TABLE RemoteEvents PARTITION BY RANGE ( TO_DAYS(ReportedAt) ) ( -- 2013 PARTITION p201307 VALUES LESS THAN ( TO_DAYS('2013-08-01') ), PARTITION p201308 VALUES LESS THAN ( TO_DAYS('2013-09-01') ), PARTITION p201309 VALUES LESS THAN ( TO_DAYS('2013-10-01') ), PARTITION p201310 VALUES LESS THAN ( TO_DAYS('2013-11-01') ), PARTITION p201311 VALUES LESS THAN ( TO_DAYS('2013-12-01') ), PARTITION p201312 VALUES LESS THAN ( TO_DAYS('2014-01-01') ), -- 2014 PARTITION p201401 VALUES LESS THAN ( TO_DAYS('2014-02-01') ), PARTITION p201402 VALUES LESS THAN ( TO_DAYS('2014-03-01') ), ~snip~ PARTITION p201811 VALUES LESS THAN ( TO_DAYS('2018-12-01') ), PARTITION p201812 VALUES LESS THAN ( TO_DAYS('2019-01-01') ), -- max PARTITION pmax VALUES LESS THAN MAXVALUE ); # 未来を見据えてとことんパーティション追加 ALTER TABLE RemoteEvents REORGANIZE PARTITION pmax INTO ( -- 2019 PARTITION p201901 VALUES LESS THAN ( TO_DAYS('2019-02-01') ), PARTITION p201902 VALUES LESS THAN ( TO_DAYS('2019-03-01') ), ~snip~ PARTITION p202211 VALUES LESS THAN ( TO_DAYS('2022-12-01') ), PARTITION p202212 VALUES LESS THAN ( TO_DAYS('2023-01-01') ), PARTITION pmax VALUES LESS THAN MAXVALUE ); |
これでMySQLサーバの準備は完了です。
rsyslogサーバ
rsyslogのTLS(SSL)暗号化 の続きなので、サーバとして受信する設定は設定済みとします。モジュール
MySQLのクライアントとなるrsyslogサーバに、モジュールのパッケージを入れます。
1 2 3 4 5 6 |
apt-get install rsyslog-mysql dbconfig-common No # デフォの設定は邪魔なので除ける mv /etc/rsyslog.d/mysql.conf /etc/rsyslog.d/mysql.conf~example |
転送ログ一部削除の設定
FORWARDされてきたものの、MySQLに保存しないで破棄する用の設定ファイルを置いておきます。クライアントに送らせないのが一番ですが、サーバですぐ対応できるので。/etc/rsyslog.d/50-collector-drop.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# # Collector DROP # if \ $fromhost-ip != '127.0.0.1' and \ ( \ $programname == 'sshd' and \ ( \ $msg startswith ' [LDAP] no keys found' or \ $msg startswith ' pam_ldap:' \ ) \ ) \ then ~ |
MySQLに保存する設定
クライアント同様Queueを利用し、MySQLのダウンに備えます。独自テーブルに合わせてテンプレートを作成し、リモートからきたログのみMySQLに保存します。
%msg% は先頭のスペースを削除しています。
/etc/rsyslog.d/51-collector-mysql.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# # Queue # # Where to place spool and state files $WorkDirectory /var/spool/rsyslog # unique name prefix for spool files $ActionQueueFileName queue # Disk space limit $ActionQueueMaxDiskSpace 100m # save messages to disk on shutdown $ActionQueueSaveOnShutdown on # run asynchronously $ActionQueueType LinkedList # infinite retries if host is down $ActionResumeRetryCount -1 # # MySQL # $ModLoad ommysql # Original MySQL Template $template RemoteEvents, \ "INSERT INTO RemoteEvents \ (ReportedAt, ReceivedAt, \ HostName, IPAddress, \ Facility, FacilityText, Priority, PriorityText, \ Program, Message) \ VALUES \ ('%timereported:::date-mysql%', '%timegenerated:::date-mysql%', \ '%hostname%', '%fromhost-ip%', \ '%syslogfacility%', '%syslogfacility-text%', '%syslogpriority%', '%syslogpriority-text%', \ '%programname%', '%msg:2:$%' \ ) \ ",SQL # save collected remote logs to MySQL if \ $fromhost-ip != '127.0.0.1' \ then :ommysql:10.20.30.40,Syslog,rsyslog,password;RemoteEvents & ~ |
反映&確認
再起動して反映します。
1 2 3 4 5 |
# 設定確認 rsyslogd -N1 # 再起動 /etc/init.d/rsyslog restart |
クライアントから適当にログを送って、MySQLデータを確認します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
mysql> SELECT * FROM RemoteEvents ORDER BY ID DESC LIMIT 1; *************************** 1. row *************************** ID: 3 ReportedAt: 2013-09-09 17:47:39 ReceivedAt: 2013-09-09 17:47:39 HostName: gedowfather-debian7-01 IPAddress: 210.123.210.123 Facility: 10 FacilityText: authpriv Priority: 6 PriorityText: info Program: su Message: pam_unix(su:session): session closed for user GedowFather 1 row in set (0.00 sec) |
デフォルト構成
付属のテーブル構成・デフォルトテンプレートを確認しておきます。テーブル名は SystemEvents になっています。
スキーマ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
mysql> show create table SystemEvents \G *************************** 1. row *************************** Table: SystemEvents Create Table: CREATE TABLE `SystemEvents` ( `ID` int(10) unsigned NOT NULL AUTO_INCREMENT, `CustomerID` bigint(20) DEFAULT NULL, `ReceivedAt` datetime DEFAULT NULL, `DeviceReportedTime` datetime DEFAULT NULL, `Facility` smallint(6) DEFAULT NULL, `Priority` smallint(6) DEFAULT NULL, `FromHost` varchar(60) DEFAULT NULL, `Message` text, `NTSeverity` int(11) DEFAULT NULL, `Importance` int(11) DEFAULT NULL, `EventSource` varchar(60) DEFAULT NULL, `EventUser` varchar(60) DEFAULT NULL, `EventCategory` int(11) DEFAULT NULL, `EventID` int(11) DEFAULT NULL, `EventBinaryData` text, `MaxAvailable` int(11) DEFAULT NULL, `CurrUsage` int(11) DEFAULT NULL, `MinUsage` int(11) DEFAULT NULL, `MaxUsage` int(11) DEFAULT NULL, `InfoUnitID` int(11) DEFAULT NULL, `SysLogTag` varchar(60) DEFAULT NULL, `EventLogType` varchar(60) DEFAULT NULL, `GenericFileName` varchar(60) DEFAULT NULL, `SystemID` int(11) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1 1 row in set (0.00 sec) |
データ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
mysql> select * from SystemEvents \G *************************** 1. row *************************** ID: 1 CustomerID: NULL ReceivedAt: 2013-08-07 18:31:32 DeviceReportedTime: 2013-08-07 18:31:32 Facility: 4 Priority: 6 FromHost: gedowfather-debian7-01 Message: Accepted publickey for GedowFather from 192.168.123.45 port 41199 ssh2 NTSeverity: NULL Importance: NULL EventSource: NULL EventUser: NULL EventCategory: NULL EventID: NULL EventBinaryData: NULL MaxAvailable: NULL CurrUsage: NULL MinUsage: NULL MaxUsage: NULL InfoUnitID: 1 SysLogTag: sshd[11164]: EventLogType: NULL GenericFileName: NULL SystemID: NULL 1 row in set (0.00 sec) |
テンプレート
1 2 3 4 5 6 |
$template tpl, "insert into SystemEvents \ (Message, Facility, FromHost, Priority, DeviceReportedTime, ReceivedAt, InfoUnitID, SysLogTag) \ values \ ('%msg%', %syslogfacility%, '%HOSTNAME%', %syslogpriority%, '%timereported:::date-mysql%', '%timegenerated:::date-mysql%', %iut%, '%syslogtag%')", \ SQL |
プロパティの確認
リンク
プロパティ確認用の設定
プロパティが実際に保存する値がどんなものか確認するための設定です。ドキュメントにある項目をズラズラ並べただけです。
$now なども %$now% と同じように % で挟んで使用します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
$template AvailableProperties, \ "%msg%, %rawmsg%, %hostname%, %source%, %fromhost%, %fromhost-ip%, \ %syslogtag%, %programname%, %pri%, %pri-text%, %iut%, %syslogfacility%, \ %syslogfacility-text%, %syslogseverity%, %syslogseverity-text%, %syslogpriority%, \ %syslogpriority-text%, %timegenerated%, %timereported%, %timestamp%, %protocol-version%, \ %structured-data%, %app-name%, %procid%, %msgid%, %parsesuccess%, %inputname%, \ %$bom%, %$uptime%, %$now%, %$year%, %$month%, %$day%, \ %$hour%, %$hhour%, %$qhour%, %$minute%, %$myhostname%, %$!<name>%, %$!all-json%" /var/log/custom.log;AvailableProperties |
プロパティごとの保存値
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
%msg% Accepted publickey for GedowFather from 192.168.123.45 port 50746 ssh2 %rawmsg% <38>Aug 8 11:35:49 gedowfather-debian7-01 sshd[22000]: Accepted publickey for GedowFather from 192.168.123.45 port 50746 ssh2 %hostname% gedowfather-debian7-01 %source% gedowfather-debian7-01 %fromhost% 172.30.40.11 %fromhost-ip% 172.30.40.11 %syslogtag% sshd[22000]: %programname% sshd %pri% 38 %pri-text% auth.info<38> %iut% 1 %syslogfacility% 4 %syslogfacility-text% auth %syslogseverity% 6 %syslogseverity-text% info %syslogpriority% 6 %syslogpriority-text% info %timegenerated% Aug 8 11:35:49 %timereported% Aug 8 11:35:49 %timestamp% Aug 8 11:35:49 %protocol-version% 0 %structured-data% -, %app-name% sshd %procid% 22000 %msgid% -, %parsesuccess% **INVALID PROPERTY NAME**parsesuccess**INVALID PROPERTY NAME** (This property is available since version 6.3.8.) %inputname% imtcp %$bom% %$uptime% **INVALID PROPERTY NAME**$uptime**INVALID PROPERTY NAME** %$now% 2013-08-08 %$year% 2013 %$month% 08 %$day% 08 %$hour% 11 %$hhour% 01 %$qhour% 02 %$minute% 35 %$myhostname% gedowfather-rsyslog-01 %$!<name>% **INVALID PROPERTY NAME**$!<name>**INVALID PROPERTY NAME** %$!all-json% **INVALID PROPERTY NAME**$!all-json**INVALID PROPERTY NAME** |
注意点
IPアドレス
ソースIPアドレスとなる %fromhost% と %fromhost-ip% の違いはドキュメントに書いてあるからいいとして、実際に保存されるIPアドレスはL3層でのrsyslogサーバ接続ソースアドレスになるので、NATされているとNAT時のアドレスが記録されることになります。要望的にはログを記録したクライアントのプライベートアドレスがベストなのですが、VPN限定にでもしない限り仕組み的にどうしようもないので気持ち記録しておく程度になります。
先頭スペース
%msg% と %app-name% の先頭にスペースが入ります。%app-name% は %programname% を使えばいいですが、%msg% は使わざるを得ないので、気持ち悪いですが保存時に2文字目から終端までを使うことになります。バックアップ
蓄積データなので、バックアップはテーブル・DB丸ごと取ると無駄になります。スクリプトの抜粋になりますが、こんな感じで -w でConditionsを指定して一日分抽出しています。
1 2 3 4 5 6 |
mysqldump \ -h $MYSQL_HOST -u $MYSQL_USER -p$MYSQL_PASS \ -w "ReportedAt BETWEEN '$DATE 00:00:00' AND '$DATE 23:59:59'" \ --compact --single-transaction --master-data=2 \ $MYSQL_DB $MYSQL_TABLE \ | pbzip2 > $BACKUP_FILE |
本当は転送遅延を考慮して ReceivedAt で条件付けた方が正しいかもですが、ジャスト0時にとるわけじゃないし、その辺は重要度とかインデックスの都合でよしなに・・・
古いデータの削除
特に自動化せず、ディスク容量のアラートがきたら削除する程度でよいかと。解析
詳しくは省きますが、1分に1回の検知系と、1日1回の集計系を仕込んでいます。検知したいログが発見されたら、ほぼ同じスキーマの別テーブルに INSERT INTO ~ SELECT でコピーしておきつつ、詳細を記載した検知アラートを出すなどしています。
syslog-ng と両方使ったことがありますが、rsyslog の方が圧倒的に取っ付きやすくて好きですね。
地味なところですけど、ちゃんと理解して使えるようになっておきたいところです。