Introduction
The new Flux language introduced in InfluxDB v2 addresses many InfluxQL language limitations.
Let’s see some of the features now possible using Flux language on time series data : joins, pivots, histograms…
In most topics, real use cases are discussed : joining data when timestamps differ by a few seconds,
simulating outer joins waiting for the new join methods to come (left
, right
…), building histograms without cumulative data but the difference…
Using SQL data sources in Flux language is not addressed here, this topic deserves a dedicated article : InfluxDB v2, Flux language and SQL databases
- For beginners in the Flux language and used to InfluxQL : InfluxDB - Moving from InfluxQL to Flux language
- The sections of this paper are also summarized in a reference guide/cheat sheet : InfluxDB v2 : Flux language, quick reference guide and cheat sheet
InfluxDB v2, Quick reminders
Output format
It’s important to understand the InfluxDB v2 raw data output format, different than the v1 output format. In the output below, filter is set only on the measurement name, filters on tag keys and on field keys are not applied.
from(bucket: "netdatatsdb/autogen")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "vpsmetrics")
|> yield()
table _start _stop _time _value _field _measurement host location
----- -------------------- --------------------- ------------------------------ ------ ------ ------------ ------------ --------
0 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z 2021-02-05T03:00:06.446501067Z 1182 mem vpsmetrics vpsfrsqlpac1 france
0 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z 2021-02-05T03:00:16.604175869Z 817 mem vpsmetrics vpsfrsqlpac1 france
…
1 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z 2021-02-05T03:00:06.446501067Z 62 pcpu vpsmetrics vpsfrsqlpac1 france
1 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z 2021-02-05T03:00:16.604175869Z 66 pcpu vpsmetrics vpsfrsqlpac1 france
…
2 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z 2021-02-05T03:00:07.420674651Z 429 mem vpsmetrics vpsfrsqlpac2 france
2 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z 2021-02-05T03:00:17.176860469Z 464 mem vpsmetrics vpsfrsqlpac2 france
…
3 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z 2021-02-05T03:00:07.420674651Z 29 pcpu vpsmetrics vpsfrsqlpac2 france
3 2021-02-05T00:00:00Z 2021-02-05T23:59:00Z 2021-02-05T03:00:17.176860469Z 32 pcpu vpsmetrics vpsfrsqlpac2 france
- A table identifier is applied on each results set.
- Range time is fully described by the columns
_start
and_stop
. - The measurement name is in the column
_measurement
. - The field key and its value are respectively in the columns
_field
and_value
. - Tag keys columns are displayed at the end.
Columns can be removed using the drop
or keep
functions, we may
not want all the columns in the raw data ouput format.
|> keep(columns: ["_value", "_time"])
|> drop(fn: (column) => column =~ /^_(start|stop)/)
Columns are renamed using rename
function :
|> keep(columns: ["_value", "_time"])
|> rename(columns: {_value: "pcpu", _time: "when"})
Querying data
In this article, Flux queries are executed with one of the 2 following methods :
influx
client command line.- Integrated InfluxDB v2 GUI
https://<host-influxdb>:8086
(formerly Chronograf in InfluxDB v1 Tick stack suite).
influx client
$ influx query --file query.flux
To be able to use influx
client, config (url, token, org…) is defined before.
$ export INFLUX_CONFIGS_PATH=/sqlpac/influxdb/srvifx2sqlpac/configs
$ export INFLUX_ACTIVE_NAME=default
/sqlpac/influxdb/srvifx2sqlpac/configs :
[default]
url = "https://vpsfrsqlpac:8086"
token = "K2YXbGhIJIjVhL…"
org = "sqlpac"
active = true
InfluxDB v2 GUI
In the InfluxDB GUI : use "View Raw Data" toggle button to display queries raw results sets.
Joins
In the use case, cpu and memory used percentages per host are stored respectively in vps_cpu
and
vps_pmem
measurements :
Reading the documentation, one may conclude : it is easy to join data with the join
function !
datapcpu = from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vps_pcpu" )
|> filter(fn: (r) => r.host == "vpsfrsqlpac1" )
|> keep(columns: ["_measurement","_time","_field","_value","host"])
datapmem = from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vps_pmem" )
|> filter(fn: (r) => r.host == "vpsfrsqlpac1" )
|> keep(columns: ["_measurement","_time","_field","_value","host"])
join(
tables: {pcpu:datapcpu, pmem:datapmem},
on: ["_time","host"],
method: "inner"
)
|> keep(columns: ["_value_pmem", "_value_pcpu", "_time"])
Executing the query : no results ! Why ?
Normalizing irregular timestamps
In real systems, joining on _time
columns is always problematic.
The column _time
usually stores seconds, milliseconds, microseconds… which are very slightly different to join 2 points.
First solution : use truncateTimeColumn
function to normalize irregular timestamps. This function truncates
all _time
values to a specified unit. In the above example, _time
values are truncated to the minute : join works.
datapcpu = from(bucket: "netdatatsdb/autogen") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "vps_pcpu" ) |> filter(fn: (r) => r.host == "vpsfrsqlpac1" ) |> keep(columns: ["_measurement","_time","_field","_value","host"]) |> truncateTimeColumn(unit:1m) datapmem = from(bucket: "netdatatsdb/autogen") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "vps_pmem" ) |> filter(fn: (r) => r.host == "vpsfrsqlpac1" ) |> keep(columns: ["_measurement","_time","_field","_value","host"]) |> truncateTimeColumn(unit:1m) join( tables: {pcpu:datapcpu, pmem:datapmem}, on: ["_time","host"], method: "inner" ) |> keep(columns: ["_value_pmem", "_value_pcpu", "_time"])
_time:time _value_pcpu:float _value_pmem:float ------------------------------ ---------------------------- ---------------------------- 2021-03-01T17:22:00.000000000Z 63 26.8 2021-03-01T17:23:00.000000000Z 73 29.725 2021-03-01T17:24:00.000000000Z 57 22.325
If technically and functionnally acceptable, the aggregateWindow
function is another alternative solution.
datapcpu = from(bucket: "netdatatsdb/autogen") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "vps_pcpu" ) |> filter(fn: (r) => r.host == "vpsfrsqlpac1" ) |> keep(columns: ["_measurement","_time","_field","_value","host"]) |> aggregateWindow(every: 1m, fn: mean) datapmem = from(bucket: "netdatatsdb/autogen") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "vps_pmem" ) |> filter(fn: (r) => r.host == "vpsfrsqlpac1" ) |> keep(columns: ["_measurement","_time","_field","_value","host"]) |> aggregateWindow(every: 1m, fn: mean) join( tables: {pcpu:datapcpu, pmem:datapmem}, on: ["_time","host"], method: "inner" ) |> keep(columns: ["_value_pmem", "_value_pcpu", "_time"])
_time:time _value_pcpu:float _value_pmem:float ------------------------------ ---------------------------- ---------------------------- 2021-03-01T17:23:00.000000000Z 63 26.8 2021-03-01T17:24:00.000000000Z 73 29.725 2021-03-01T17:25:00.000000000Z 57 22.325
Joins methods and limitations
In the previous use case, join is performed using the inner
method.
join(
tables: {pcpu:datapcpu, pmem:datapmem},
on: ["_time","host"],
method: "inner"
)
Only the inner
method is currently allowed (v 2.0.4).
In future releases, other joins methods will be gradually implemented : cross
, left
, right
, full
.
Another limitation : joins currently must only have two parents. This limit should be also extended in future releases.
How to manage outer joins waiting the other methods to be ready ?
The combination of the aggregateWindow
and fill
functions is a potential workaround to manage outer joins for the moment.
The fill
function replaces the NULL
values by a default value, the default value applied will depend on the technical context and desired rendering (charts…).
Previous value | Fixed value |
---|---|
|
|
datapcpu = from(bucket: "netdatatsdb/autogen") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "vps_pcpu" ) |> filter(fn: (r) => r.host == "vpsfrsqlpac1" ) |> keep(columns: ["_measurement","_time","_field","_value","host"]) |> aggregateWindow(every: 1m, fn: mean) |> fill(column: "_value", value: 0.0) datapmem = from(bucket: "netdatatsdb/autogen") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "vps_pmem" ) |> filter(fn: (r) => r.host == "vpsfrsqlpac1" ) |> keep(columns: ["_measurement","_time","_field","_value","host"]) |> aggregateWindow(every: 1m, fn: mean) |> fill(column: "_value", value: 0.0) join( tables: {pcpu:datapcpu, pmem:datapmem}, on: ["_time","host"] ) |> keep(columns: ["_value_pmem", "_value_pcpu", "_time"])
_time:time _value_pcpu:float _value_pmem:float ------------------------------ ---------------------------- ---------------------------- 2021-03-01T18:23:00.000000000Z 76 26 2021-03-01T18:24:00.000000000Z 63 24.65 2021-03-01T18:25:00.000000000Z 54 0.0 2021-03-01T18:26:00.000000000Z 69 0.0 2021-03-01T18:27:00.000000000Z 61 0.0 2021-03-01T18:28:00.000000000Z 79 23.4 2021-03-01T18:29:00.000000000Z 65 21.8
Pivot
Default output format is really not easy for some specific further processings (writing to another measurement, to a SQL database…).
from(bucket: "netdatatsdb/autogen") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "vps_pcpumem" and r.host == "vpsfrsqlpac1" ) |> keep(columns: ["_measurement","_time","_field","_value","host"])
Table: keys: [_field, _measurement, host] _field:string _measurement:string host:string _time:time _value:float ------------------ ---------------------- ---------------------- ------------------------------ ------------------- mem vps_pcpumem vpsfrsqlpac1 2021-03-01T18:50:46.919579473Z 1055 mem vps_pcpumem vpsfrsqlpac1 2021-03-01T18:51:47.359821389Z 1069 … … … … … Table: keys: [_field, _measurement, host] _field:string _measurement:string host:string _time:time _value:float ------------------ ---------------------- ---------------------- ------------------------------ ------------------- pcpu vps_pcpumem vpsfrsqlpac1 2021-03-01T18:50:46.919579473Z 68 pcpu vps_pcpumem vpsfrsqlpac1 2021-03-01T18:51:47.359821389Z 57 … … … … …
Pivoting data to format output was a major missing feature in InfluxQL. Feature ready and easy to use in Flux language :
from(bucket: "netdatatsdb/autogen") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "vps_pcpumem" and r.host == "vpsfrsqlpac1") |> keep(columns: ["_measurement","_time","_field","_value","host"]) |> pivot( rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value" )
Table: keys: [_measurement, host] _measurement:string host:string _time:time mem:float pcpu:float ---------------------- --------------- ------------------------------ ------------------- --------------------- vps_pcpumem vpsfrsqlpac1 2021-03-01T18:50:46.919579473Z 1055 68 vps_pcpumem vpsfrsqlpac1 2021-03-01T18:51:47.359821389Z 1069 57 … … … … …
Like joins, if needed use the truncateTimeColumn
function to normalize timestamps.
Use schema.fieldsAsCols()
shortcut function when pivoting data only on time/_field :
import "influxdata/influxdb/schema"
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vps_pcpumem" and
r.host == "vpsfrsqlpac1" )
|> keep(columns: ["_measurement","_time","_field","_value","host"])
|> schema.fieldsAsCols()
Histograms
Producing histograms by a query is made simple in the Flux language with histogram
and linearBins
functions.
linearBins
function eases the creation of the list of linearly separated floats ([0.0,5.0,10.0,15.0,…]
).
from(bucket:"netdatatsdb/autogen") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "vps_pcpumem" and r.host == "vpsfrsqlpac1" and r._field == "pcpu" ) |> histogram( bins: linearBins(start: 0.0, width: 5.0, count: 21, infinity: false) ) |> keep(columns: ["le","_value"])
le:float _value:float ---------------------------- ---------------------------- 0 0 5 0 10 0 15 0 20 0 25 0 30 0 35 0 40 42 45 206 50 389 55 547 60 716 65 910 70 1085 75 1260 80 1427 85 1427 90 1427 95 1427 100 1427
A column named le
(less or equals) is automatically created, in the _value
column data are cumulative.
Apply the difference
function to cancel the cumulative mode and thus build a typical histogram
|> histogram( bins: linearBins(start: 0.0, width: 5.0, count: 21, infinity: false) ) |> difference() |> keep(columns: ["le","_value"])
le:float _value:float ---------------------------- ---------------------------- 5 0 10 0 15 0 20 0 25 0 30 0 35 0 40 42 45 161 50 183 55 156 60 172 65 195 70 175 75 175 80 169 85 0 90 0 95 0 100 0
Logarithmic mode is performed with logarithmicBins
function.
|> histogram(
bins: logarithmicBins(start: 1.0, factor: 2.0, count: 10,
infinity: false)
histogram
function is of interest when we need to build histograms by programming for further processing.
InfluxDB GUI or Grafana does not need the histogram
function.
In InfluxDB GUI, to visualize an histogram :
- The query source without
histogram
function call is used.from(bucket:"netdatatsdb/autogen") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "vps_pcpumem" and r.host == "vpsfrsqlpac1" and r._field == "pcpu" )
- Select "Histogram" graph type and click on the button "Customize".
- Define the X column (
_value
), the bins (20
) and the X max value (100
).
In Grafana, same methodology, the query source without histogram
function call is used.
Computed columns - map
Computed columns are created with map
function. map
function is usually called after a join or a pivot operation.
In the example below, vps_space
measurement stores two fields per point : used space (used
) and available space (available
).
In the final result set, we want to compute the space used percentage in a column named pctspace
:
from(bucket: "netdatatsdb/autogen") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "vps_space" and r.host == "vpsfrsqlpac1" and (r._field == "used" or r._field == "available") ) |> pivot( rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value" ) |> map(fn: (r) => ({ r with pctspace: (r.used / (r.used + r.available)) * 100.0 })) |> keep(columns: ["_time", "pctspace"])
_time:time pctspace:float ------------------------------ ---------------------------- 2021-03-01T17:51:47.437958334Z 73.84712268752916 2021-03-01T17:52:47.900410821Z 73.84713257521788 2021-03-01T17:53:48.389130505Z 73.8471523505953
The with
operator is very important when calling map
function.
In the default basic syntax (without the with
operator),
columns that are not part of the input table’s group key (_time
, _start
, _stop
…)
and not explicitly mapped are removed :
|> map(fn: (r) => ({ pctspace: (r.used / (r.used + r.available)) * 100.0 }))
Table: keys: [] pctspace:float ---------------------------- 73.84734021668079 73.84734021668079
The with
operator updates a column if it already exists, creates a new column if it doesn’t exist,
and includes all existing columns in the output table.
Cross computed columns
Computed columns referencing computed columns can not be defined in one unique map
function call :
|> map(fn: (r) => ({ r with totalspace: r.used + r.available, pctspace: (r.used / r.totalspace) * 100.0 })) |> keep(columns: ["_time", "used", "totalspace","pctspace"])
_time:time pctspace:float totalspace:float used:float ------------------------------ ---------------------------- ---------------------------- ---------------------------- 2021-03-02T18:33:36.041221773Z 40454348 29889216 2021-03-02T18:34:36.532166008Z 40454348 29889220
In this specific context, map
function must be called sequentially :
|> map(fn: (r) => ({ r with totalspace: r.used + r.available })) |> map(fn: (r) => ({ r with pctspace: (r.used / r.totalspace) * 100.0 })) |> keep(columns: ["_time", "used", "totalspace","pctspace"])
_time:time pctspace:float totalspace:float used:float ------------------------------ ---------------------------- ---------------------------- ---------------------------- 2021-03-02T18:33:36.041221773Z 73.88386533877645 40454348 29889236 2021-03-02T18:34:36.532166008Z 73.88388511415386 40454348 29889244
Conditions, exists
Conditions can be defined in map
function and the exists
operator is available :
import "math" … |> map(fn: (r) => ({ r with totalspace: r.used + r.available })) |> map(fn: (r) => ({ r with pctspace: (r.used / r.totalspace) * 100.0 })) |> aggregateWindow(every: 1m, fn: mean, column: "pctspace") |> map(fn: (r) => ({ r with pctspace: if exists r.pctspace then "${string(v: math.round(x: r.pctspace) )} %" else "N/A" })) |> keep(columns: ["_time", "used","pctspace"])
_time:time pctspace:string ------------------------------ ---------------------- 2021-03-02T14:35:00.000000000Z 74 % 2021-03-02T14:36:00.000000000Z N/A
Custom aggregate functions - reduce
mean
, max
, min
, sum
, count
… Flux functions are good shortcuts to aggregates,
but unfortunately only one aggregate can be returned.
from(bucket: "netdatatsdb/autogen") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "vps_space" and r.host == "vpsfrsqlpac1" and (r._field == "used" or r._field == "available") ) |> pivot( rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value" ) |> count(column: "used")
used:int -------------------------- 1428
Multiple custom aggregate results can be extracted using reduce
function
which returns a unique row storing the aggregates with no _time
column.
from(bucket: "netdatatsdb/autogen") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "vps_space" and r.host == "vpsfrsqlpac1" and (r._field == "used" or r._field == "available") ) |> pivot( rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value" ) |> map(fn: (r) => ({ r with totalspace: r.used + r.available })) |> map(fn: (r) => ({ r with pctspace: (r.used / r.totalspace) * 100.0 })) |> reduce(fn: (r, accumulator) => ( { count: accumulator.count + 1, max: if r.pctspace > accumulator.max then r.pctspace else accumulator.max }), identity: {count: 0, max: 0.0} ) |> keep(columns: ["count","max"])
count:int max:float ---------------------------- ---------------------------- 1428 73.93648761809237
identity
defines the initial values as well as the data types (0
for an integer, 0.0
for a float).
Like map
function, conditions and exists
operator are allowed.
One may comment : what a stupid example ! The reduce
function computes aggregates available
with native Flux functions, why reinventing the wheel ?
The reduce
function indeed should be used for custom aggregates computations for which no native function exists.
The above example can be rewritten using native functions count
and max
, a join is performed :
dataspace = from(bucket: "netdatatsdb/autogen") |> range(start: -1d) … datacnt = dataspace |> count(column:"pctspace") |> rename(columns: {"pctspace": "count"}) datamax = dataspace |> max(column:"pctspace") |> rename(columns: {"pctspace": "max"}) join( tables: {cnt: datacnt, max: datamax}, on: ["_measurement"] ) |> keep(columns: ["count", "max"]) |> yield()
count:int max:float -------------------------- ---------------------------- 1428 73.93870246036347
The above example would have been the following InfluxQL query with a subquery :
SELECT count(pctspace), max(pctspace)
FROM (SELECT (used / (used + available)) * 100 as pctspace
FROM "netdatatsdb.autogen.vps_space"
WHERE (host='vpsfrsqlpac1' and time > now() -1d)
)
Significant calendar functions enhancements
hourSelection
Selecting specified hours of the day was not possible using InfluxQL. Flux language supports DatePart-like queries with
the function hourSelection
returning only data with time values in a specified hour range.
from(bucket: "netdatatsdb/autogen")
|> range(start: -72h)
|> filter(fn: (r) => r._measurement == "vps_pcpumem" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu"
)
|> hourSelection(start: 8, stop: 18)
|> aggregateWindow(every: 1h, fn: mean)
Windowing data by month(s) and year(s)
Flux language supports windowing data by calendar months (mo
) and years (yr
) : 1mo
, 3mo
, 1yr
…
This feature was not either possible with InfluxQL. Useful for producing KPI (Key Performance Indicators) per months, quarters, semesters, years.
|> aggregateWindow(every: 1mo, fn: mean)