zkat’s diary

技術ブログ

InfluxdbのContinuous Query

はじめに

測定した生のデータをそのままInfluxDBに貯めて、各種操作をすると重すぎるので適当な時間間隔でデータをダウンサンプリングすることにしました。 公式ドキュメントはこちらです。 docs.influxdata.com

Continuous Queryでgroup by timeすることで、ダウンサンプリングします。

シンタックス

Continuous Queryのシンタックスを公式ドキュメントから引用します。

CREATE CONTINUOUS QUERY <cq_name> ON <database_name>
BEGIN
    SELECT <function[s]> 
    INTO <destination_measurement> 
    FROM <measurement> 
    [WHERE <stuff>] 
    GROUP BY time(<interval>)[,<tag_key[s]>]
END

ダウンサンプルした結果は別の適当なretention policydbで保持したくなると思います。 <destination_measurement>は、"dbname"."retention policy name"."measurement name"の形で保存先を指定することができます。 retention policyはあらかじめ作っておく必要があります。 データ取得元の<measurement>も同じ形で指定します。

実行したContinuous Query

show continuous queryすると実行したクエリを表示できます。 time_tookについて5分間の平均をとって格納します。保存先に、"1day"という1日だけデータを保持するretention policyを設定しているので、 古くなったデータは順次消えていきます。

CREATE CONTINUOUS QUERY  cq_downsampled_successed_5min_1day ON downsampled_dnsprobe
BEGIN 
    SELECT mean(time_took)
    INTO downsampled_dnsprobe."1day".mes_cq_downsampled_successed_5min_1day 
    FROM dnsprobe.thirty_five_days.dnsprobe 
    WHERE got_response = 'True' 
    GROUP BY  time(5m), prb_id, af, proto, dst_name, rrtype 
END

データ取得元のdnsprobeは以下のようなスキーマです。

> show field keys
name: dnsprobe
fieldKey       fieldType
--------       ---------
data           string
id             integer
mname          string
name           string
probe_asn      string
probe_asn_desc string
probe_uptime   string
reason         string
rname          string
serial         integer
time_took      float
ttl            integer
type           string

> show tag keys
name: dnsprobe
tagKey
------
af
dst_addr
dst_name
error_class_name
got_response
nsid
prb_id
prb_lat
prb_lon
proto
qname
rrtype
src_addr