Querying data
- Defining the data source (bucket - database) :
from
from(bucket: "netdatatsdb/autogen")
- Time range, absolute or relative :
range
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h, stop: -10m)
from(bucket: "netdatatsdb/autogen")
|> range(start: 2021-01-25T00:00:00Z, stop: 2021-01-29T23:59:00Z)
- Filtering by measurement :
filter
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
- Filtering by tag key :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
|> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
- Filtering by field and field value :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
|> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
|> filter(fn: (r) => r["_field"] == "pcpu")
|> filter(fn: (r) => r["_value"] > 80)
- Filters can be combined in one
filter
clause with theand
/or
operators :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics" and
r["host"] == "vpsfrsqlpac1" and
r["_field"] == "pcpu" and r["_value"] > 80
)
- Depending on preferences, dot notation is allowed :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" and r._value > 80
)
- Displaying data :
yield
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" and r._value > 80 )
|> yield()
from
and range
clauses are removed for brevity.
- Using regular expressions :
|> filter(fn: (r) => r.host =~ /^vpsfrsqlpac[1-8]$/ )
|> filter(fn: (r) => r.host !~ /^vpsensqlpac[1-8]$/ )
- Top
n
records :limit
|> limit(n:10)
- Last
n
records :tail
|> tail(n:10)
- Sorting data :
sort
|> sort(columns: ["_value"], desc: true)
- Renaming a column :
rename
|> rename(columns: {_value: "average", _time: "when"})
- Removing output columns :
drop
|> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
- Selecting output columns :
keep
|> keep(columns: ["_value", "_time"])
- A simple first Flux query before going further :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" and
r._value > 80)
|> sort(columns: ["_value"], desc: true)
|> rename(columns: {_value: "average"})
|> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
|> limit(n:10)
|> yield()
Windowing data
aggregateWindow
|> aggregateWindow(
every: 1m,
fn: mean,
column: "_value",
timeSrc: "_stop",
timeDst: "_time",
createEmpty: true
)
|> aggregateWindow(every: 10m, fn: mean)
To aggregate on a different column than the default _value
column :
|> aggregateWindow(every: 10m, fn: mean, column: "colname")
To remove the NULL
values, set createEmpty
to False
:
|> aggregateWindow(every: 10m, fn: mean, createEmpty: false)
window
aggregateWindow
is in fact a shortcut function using window
function.
|> window(every: 10m)
|> mean()
createEmpty
is set to true
to display null values
|> window(every: 10m, createEmpty: true)
|> mean()
Column _time
is then not aggregated in the output table, to re-add _time
column for further processing,
duplicate
function is called to duplicate _start
or _stop
column as
the new _time
column :
|> window(every: 10m, createEmpty: true)
|> mean()
|> duplicate(column: "_stop", as: "_time")
To recover the regular format, data are finally "unwindowed" :
|> window(every: 10m, createEmpty: true)
|> mean()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
fill
Optionally, use the fill
function to handle empty values when createEmpty
is defined to true
when windowing data.
|> fill(column: "_value", value: 0.0)
|> fill(column: "_value", usePrevious: true)
- Window by calendar months and years
Flux supports windowing data by calendar months and years : 1mo
, 1yr
. This feature was not possible with InfluxQL.
|> aggregateWindow(every: 1mo, fn: mean)
Hour Selection
hourSelection
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu"
)
|> hourSelection(start: 8, stop: 18)
Joins
join
datapcpu = from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vps_pcpu" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" )
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
datapmem = from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vps_pmem" and
r.host == "vpsfrsqlpac1" and
r._field == "pmem" )
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
join(
tables: {pcpu:datapcpu, pmem:datapmem},
on: ["_time", "host"],
method: "inner"
)
Only the inner
method is currently allowed (v 2.0.4). In future releases, other joins methods
will be gradually implemented : cross
, left
, right
, full
.
Performing joins on _time
columns storing seconds, microseconds, nanoseconds…
can be solved by normalizing irregular timestamps using truncateTimeColumn
function which
truncate all _time
values to a specified unit :
|> truncateTimeColumn(unit: 1m)
Pivot
pivot
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" )
|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value"
)
Like joins, use the truncateTimeColumn
function to normalize timestamps.
Use schema.fieldsAsCols()
shortcut function when pivoting data only on time
/_field
:
import "influxdata/influxdb/schema"
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" )
|> schema.fieldsAsCols()
Histograms
histogram
from(bucket:"netdatatsdb/autogen")
|> range(start: -3h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu"
)
|> histogram(
bins:[0.0,10.0,20.0,30.0,40.0,50.0,60.0,70.0,80.0,90.0,100.0]
)
Use linearBins
to ease the creation of the list of
linearly separated floats.
|> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
Logarithmic floats are defined with logarithmicBins
|> histogram(
bins: logarithmicBins(start: 1.0, factor: 2.0, count: 10,
infinity: true)
)
By default, values are cumulative in the column _value
, apply the difference
function
to override the cumulative computation.
|> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
|> difference()
histogram histogram + difference
le _value le _value
----- ------ ----- ------
50 292 50 269
60 536 60 241
70 784 70 247
Computed columns (map)
map
|> map(fn: (r) => ({ _value: r._value * 2.0 }))
The basic usage of the map
function removes the columns that are not part of
the input table’s group key (_time
, _start
, _stop
…) and not explicitly mapped,
use the with
operator to avoid their removal :
|> map(fn: (r) => ({ r with _value: r._value * 2.0 }))
The with
operator updates a column if it already exists, creates a new column if it doesn’t exist,
and includes all existing columns in the output table.
Custom aggregate functions (reduce)
reduce
|> reduce(fn: (r, accumulator) => ({
count: accumulator.count + 1,
total: accumulator.total + r._value,
avg: (accumulator.total + r._value) / float(v: accumulator.count)
}),
identity: {count: 1, total: 0.0, avg: 0.0}
)
identity
defines the initial values as well as the data types.
A single row storing the aggregates is returned with no _time
column.
Writing data
to
|> to(bucket:"history", org:"sqlpac")
To write in the same bucket, use before set
function to define the measurement name :
|> set(key: "_measurement", value: "history_vpsmetrics")
|> to(bucket:"netdatatsdb/autogen", org:"sqlpac")
SQL Data sources
- Metadata informations (username, password…)
Use influx secret
or curl
to store metadata :
$ influx secret update --key PG_HOST --value vpsfrsqlpac
$ influx secret update --key PG_PORT --value 5432
$ influx secret update --key PG_USER --value influxdb
$ influx secret update --key PG_PASS --value "***********"
Retrieving secrets in a Flux script using secret
package (check the authorization read:secrets
) :
import "influxdata/influxdb/secrets"
PG_HOST = secrets.get(key: "PG_HOST")
PG_USER = secrets.get(key: "PG_USER")
PG_PASS = secrets.get(key: "PG_PASS")
PG_PORT = secrets.get(key: "PG_PORT")
sql.from
: retrieving data
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
datavps = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
query: "SELECT name, totalmemory FROM vps"
)
Available drivers in InfluxDB v2.0.4 (more to come in future releases) :
- awsathena
- bigquery
- mysql
- postgres
- snowflake
- sqlserver, mssql
- Joining data
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
datavps = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
query: "SELECT name, totalmemory FROM vps"
)
|> rename(columns : {name: "host"})
datamem = from(bucket: "netdatatsdb/autogen")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "vpsmetrics"
and r._field == "mem"
and r.host == "vpsfrsqlpac1")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
join(
tables: {vps:datavps, mem:datamem},
on: ["host"],
method: "inner"
)
|> map(fn: (r) => ({ r with _value: (r._value / r.totalmemory) * 100.0 }))
sql.to
: writing data
Check the table structure receiving data : data types, column names, null
/not null
.
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
from(bucket: "netdatatsdb/autogen")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "vps_pmem"
and r._field == "pmem"
and r.host == "vpsfrsqlpac2")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> rename(columns: {_value: "pmem", _time: "dth"})
|> keep(columns: ["host", "dth", "pmem"])
|> sql.to(
driverName: "postgres",
dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
table: "vpspmem"
)