InfluxDB v2 : Flux language, quick reference guide and cheat sheet

Logo

Querying data

  • Defining the data source (bucket - database) : from
from(bucket: "netdatatsdb/autogen")
  • Time range, absolute or relative : range
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h, stop: -10m)
              
from(bucket: "netdatatsdb/autogen")
  |> range(start: 2021-01-25T00:00:00Z, stop: 2021-01-29T23:59:00Z)
  • Filtering by measurement : filter
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
  • Filtering by tag key :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
  |> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
  • Filtering by field and field value :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
  |> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
  |> filter(fn: (r) => r["_field"] == "pcpu")
  |> filter(fn: (r) => r["_value"] > 80)
  • Filters can be combined in one filter clause with the and / or operators :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "vpsmetrics" and
                       r["host"] == "vpsfrsqlpac1" and
                       r["_field"] == "pcpu" and r["_value"] > 80
  )
  • Depending on preferences, dot notation is allowed :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and r._value > 80
  )
  • Displaying data : yield
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and r._value > 80 )
  |> yield()

from and range clauses are removed for brevity.

  • Using regular expressions :
  |> filter(fn: (r) => r.host =~ /^vpsfrsqlpac[1-8]$/ )
             
  |> filter(fn: (r) => r.host !~ /^vpsensqlpac[1-8]$/ )
  • Top n records : limit
  |> limit(n:10)
  • Last n records : tail
  |> tail(n:10)
  • Sorting data : sort
  |> sort(columns: ["_value"], desc: true)
  • Renaming a column : rename
  |> rename(columns: {_value: "average", _time: "when"})
  • Removing output columns : drop
  |> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
  • Selecting output columns : keep
  |> keep(columns: ["_value", "_time"])
  • A simple first Flux query before going further :
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" and
                       r._value > 80)
  |> sort(columns: ["_value"], desc: true)
  |> rename(columns: {_value: "average"})
  |> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
  |> limit(n:10)
  |> yield()

Windowing data

  • aggregateWindow
  |> aggregateWindow(
      every: 1m,
      fn: mean,
      column: "_value",
      timeSrc: "_stop",
      timeDst: "_time",
      createEmpty: true
)
  |> aggregateWindow(every: 10m, fn: mean)

To aggregate on a different column than the default _value column :

  |> aggregateWindow(every: 10m, fn: mean, column: "colname")

To remove the NULL values, set createEmpty to False :

  |> aggregateWindow(every: 10m, fn: mean, createEmpty: false)
  • window

aggregateWindow is in fact a shortcut function using window function.

  |> window(every: 10m)
  |> mean()

createEmpty is set to true to display null values

  |> window(every: 10m, createEmpty: true)
  |> mean()

Column _time is then not aggregated in the output table, to re-add _time column for further processing, duplicate function is called to duplicate _start or _stop column as the new _time column :

  |> window(every: 10m, createEmpty: true)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")

To recover the regular format, data are finally "unwindowed" :

  |> window(every: 10m, createEmpty: true)
  |> mean()
  |> duplicate(column: "_stop", as: "_time")
  |> window(every: inf)
  • fill

Optionally, use the fill function to handle empty values when createEmpty is defined to true when windowing data.

  |> fill(column: "_value", value: 0.0) 
  |> fill(column: "_value", usePrevious: true) 
  • Window by calendar months and years

Flux supports windowing data by calendar months and years : 1mo, 1yr. This feature was not possible with InfluxQL.

  |> aggregateWindow(every: 1mo, fn: mean) 

Hour Selection

  • hourSelection
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu"
  )
  |> hourSelection(start: 8, stop: 18)

Joins

  • join
datapcpu = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pcpu" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu" )
  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)

datapmem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vps_pmem" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pmem" )
  |> aggregateWindow(every: 10s, fn: mean, createEmpty: false)

join(
    tables: {pcpu:datapcpu, pmem:datapmem},
    on: ["_time", "host"],
    method: "inner"
  )

Only the inner method is currently allowed (v 2.0.4). In future releases, other joins methods will be gradually implemented : cross, left, right, full.

Performing joins on _time columns storing seconds, microseconds, nanoseconds… can be solved by normalizing irregular timestamps using truncateTimeColumn function which truncate all _time values to a specified unit :

 |> truncateTimeColumn(unit: 1m)

Pivot

  • pivot
from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" )
  |> pivot(
       rowKey:["_time"],
       columnKey: ["_field"],
       valueColumn: "_value"
    )

Like joins, use the truncateTimeColumn function to normalize timestamps.

Use schema.fieldsAsCols() shortcut function when pivoting data only on time/_field :

import "influxdata/influxdb/schema"

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" )
  |> schema.fieldsAsCols()

Histograms

  • histogram
from(bucket:"netdatatsdb/autogen")
  |> range(start: -3h)
  |> filter(fn: (r) => r._measurement == "vpsmetrics" and
                       r.host == "vpsfrsqlpac1" and
                       r._field == "pcpu"
  )
  |> histogram(
       bins:[0.0,10.0,20.0,30.0,40.0,50.0,60.0,70.0,80.0,90.0,100.0]
   )

Use linearBins to ease the creation of the list of linearly separated floats.

  |> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )

Logarithmic floats are defined with logarithmicBins

  |> histogram(
       bins: logarithmicBins(start: 1.0, factor: 2.0, count: 10,
                             infinity: true)
   )

By default, values are cumulative in the column _value, apply the difference function to override the cumulative computation.

  |> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
  |> difference()
histogram             histogram + difference
le     _value        le     _value
-----  ------        -----  ------
50     292           50     269
60     536           60     241
70     784           70     247

Computed columns (map)

  • map
  |> map(fn: (r) => ({ _value: r._value * 2.0 }))

The basic usage of the map function removes the columns that are not part of the input table’s group key (_time, _start, _stop…) and not explicitly mapped, use the with operator to avoid their removal :

  |> map(fn: (r) => ({ r with _value: r._value * 2.0 }))

The with operator updates a column if it already exists, creates a new column if it doesn’t exist, and includes all existing columns in the output table.

Custom aggregate functions (reduce)

  • reduce
      |> reduce(fn: (r, accumulator) => ({
      count: accumulator.count + 1,
      total: accumulator.total + r._value,
      avg: (accumulator.total + r._value) / float(v: accumulator.count)
    }),
    identity: {count: 1, total: 0.0, avg: 0.0}
  )

identity defines the initial values as well as the data types.

A single row storing the aggregates is returned with no _time column.

Writing data

  • to
  |> to(bucket:"history", org:"sqlpac")

To write in the same bucket, use before set function to define the measurement name :

  |> set(key: "_measurement", value: "history_vpsmetrics")
  |> to(bucket:"netdatatsdb/autogen", org:"sqlpac")

SQL Data sources

  • Metadata informations (username, password…)

Use influx secret or curl to store metadata :

$ influx secret update --key PG_HOST --value vpsfrsqlpac
$ influx secret update --key PG_PORT --value 5432
$ influx secret update --key PG_USER --value influxdb
$ influx secret update --key PG_PASS --value "***********"

Retrieving secrets in a Flux script using secret package (check the authorization read:secrets) :

import "influxdata/influxdb/secrets"

PG_HOST = secrets.get(key: "PG_HOST")
PG_USER = secrets.get(key: "PG_USER")
PG_PASS = secrets.get(key: "PG_PASS")
PG_PORT = secrets.get(key: "PG_PORT") 
  • sql.from : retrieving data
import "sql"

import "influxdata/influxdb/secrets"
// Get secrets…

datavps = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
  query: "SELECT name, totalmemory FROM vps"
)

Available drivers in InfluxDB v2.0.4 (more to come in future releases) :

  • awsathena
  • bigquery
  • mysql
  • postgres
  • snowflake
  • sqlserver, mssql
  • Joining data
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…

datavps = sql.from(
  driverName: "postgres",
  dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
  query: "SELECT name, totalmemory FROM vps"
)
  |> rename(columns : {name: "host"})

datamem = from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vpsmetrics"
                       and r._field == "mem"
                       and r.host == "vpsfrsqlpac1")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)

join(
    tables: {vps:datavps, mem:datamem},
    on: ["host"],
    method: "inner"
  )
  |> map(fn: (r) => ({ r with _value: (r._value / r.totalmemory) * 100.0 }))
  • sql.to : writing data

Check the table structure receiving data : data types, column names, null/not null.

import "sql"

import "influxdata/influxdb/secrets"
// Get secrets…

from(bucket: "netdatatsdb/autogen")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "vps_pmem"
                       and r._field == "pmem"
                       and r.host == "vpsfrsqlpac2")
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> rename(columns: {_value: "pmem", _time: "dth"})
  |> keep(columns: ["host", "dth", "pmem"])
  |> sql.to(
      driverName: "postgres",
      dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
      table: "vpspmem"
    )