Interrogation des données
- Définition de la source de données (bucket - database) :
from
from(bucket: "netdatatsdb/autogen")
- Plage de temps, absolue ou relative :
range
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h, stop: -10m)
from(bucket: "netdatatsdb/autogen")
|> range(start: 2021-01-25T00:00:00Z, stop: 2021-01-29T23:59:00Z)
- Filtrer par mesure :
filter
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
- Filtrer par tag key :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
|> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
- Filtrer par champ et valeur de champ :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics")
|> filter(fn: (r) => r["host"] == "vpsfrsqlpac1")
|> filter(fn: (r) => r["_field"] == "pcpu")
|> filter(fn: (r) => r["_value"] > 80)
- Les filtres peuvent être combinés en une seule clause
filter
avec les opérateursand
/or
:
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "vpsmetrics" and
r["host"] == "vpsfrsqlpac1" and
r["_field"] == "pcpu" and r["_value"] > 80
)
- Selon les préférences, la notation point est autorisée :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" and r._value > 80
)
- Affichage des données :
yield
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" and r._value > 80 )
|> yield()
Les clauses from
et range
sont supprimées par souci de concision.
- Utiliser les expressions régulières :
|> filter(fn: (r) => r.host =~ /^vpsfrsqlpac[1-8]$/ )
|> filter(fn: (r) => r.host !~ /^vpsensqlpac[1-8]$/ )
n
premiers enregistrements :limit
|> limit(n:10)
n
derniers enregistrements :tail
|> tail(n:10)
- Trier les données :
sort
|> sort(columns: ["_value"], desc: true)
- Renommer une colonne :
rename
|> rename(columns: {_value: "average", _time: "when"})
- Supprimer des colonnes en sortie :
drop
|> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
- Sélection des colonnes en sortie :
keep
|> keep(columns: ["_value", "_time"])
- Une simple requête Flux avant d’aller plus loin :
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" and
r._value > 80)
|> sort(columns: ["_value"], desc: true)
|> rename(columns: {_value: "average"})
|> drop(fn: (column) => column =~ /^_(start|stop|measurement)/)
|> limit(n:10)
|> yield()
Fenêtrage des données (Windowing)
aggregateWindow
|> aggregateWindow(
every: 1m,
fn: mean,
column: "_value",
timeSrc: "_stop",
timeDst: "_time",
createEmpty: true
)
|> aggregateWindow(every: 10m, fn: mean)
Pour agréger sur une colonne différente de la colonne par défaut _value
:
|> aggregateWindow(every: 10m, fn: mean, column: "colname")
Pour supprimer les valeurs NULL
, définir createEmpty
à False
:
|> aggregateWindow(every: 10m, fn: mean, createEmpty: false)
window
aggregateWindow
est en fait une fonction de raccourci utilisant la fonction window
.
|> window(every: 10m)
|> mean()
createEmpty
est défini à true
pour afficher les valeurs nulles.
|> window(every: 10m, createEmpty: true)
|> mean()
La colonne _time
n’est alors plus aggrégée dans la table en sortie,
pour ré-ajouter la colonne _time
pour un traitement ultérieur,
la fonction duplicate
est appelée pour dupliquer la colonne _start
ou _stop
en tant que nouvelle colonne _time
:
|> window(every: 10m, createEmpty: true)
|> mean()
|> duplicate(column: "_stop", as: "_time")
Pour retrouver le format régulier, les données sont finalement "unwindowed" :
|> window(every: 10m, createEmpty: true)
|> mean()
|> duplicate(column: "_stop", as: "_time")
|> window(every: inf)
fill
Optionnellement, utiliser la fonction fill
pour gérer les valeurs vides lorsque createEmpty
est défini à true
lors du fenêtrage des données.
|> fill(column: "_value", value: 0.0)
|> fill(column: "_value", usePrevious: true)
- Fenêtrage par mois et années
Flux supporte le fenêtrage des données par mois et années calendaires : 1mo
, 1yr
. Cette fonctionnalité
n’était pas possible avec InfluxQL.
|> aggregateWindow(every: 1mo, fn: mean)
Sélection de plages horaires
hourSelection
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu"
)
|> hourSelection(start: 8, stop: 18)
Jointures (join)
join
datapcpu = from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vps_pcpu" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu" )
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
datapmem = from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vps_pmem" and
r.host == "vpsfrsqlpac1" and
r._field == "pmem" )
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
join(
tables: {pcpu:datapcpu, pmem:datapmem},
on: ["_time", "host"],
method: "inner"
)
Seule la méthode inner
est actuellement autorisée (v 2.0.4).
Dans des versions futures, d’autres méthodes de jointures seront progressivement implémentées : cross
, left
, right
, full
.
La réalisation des jointures sur les colonnes _time
contenant secondes, microsecondes, nanosecondes…
peut être résolue en normalisant les horodatages irréguliers à l’aide de la fonction truncateTimeColumn
qui
tronque les valeurs _time
à une unité spécifiée :
|> truncateTimeColumn(unit: 1m)
Pivot
pivot
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" )
|> pivot(
rowKey:["_time"],
columnKey: ["_field"],
valueColumn: "_value"
)
Comme les jointures, utiliser la fonction truncateTimeColumn
pour normaliser les timestamps.
Utiliser la fonction de raccourci schema.fieldsAsCols()
pour pivoter les données uniquement sur time
/_field
:
import "influxdata/influxdb/schema"
from(bucket: "netdatatsdb/autogen")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" )
|> schema.fieldsAsCols()
Histogrammes
histogram
from(bucket:"netdatatsdb/autogen")
|> range(start: -3h)
|> filter(fn: (r) => r._measurement == "vpsmetrics" and
r.host == "vpsfrsqlpac1" and
r._field == "pcpu"
)
|> histogram(
bins:[0.0,10.0,20.0,30.0,40.0,50.0,60.0,70.0,80.0,90.0,100.0]
)
Utiliser linearBins
pour faciliter la création de la liste des valeurs float linéairement séparés.
|> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
Les valeurs float séparées logarithmiquement sont définies avec logarithmicBins
|> histogram(
bins: logarithmicBins(start: 1.0, factor: 2.0, count: 10,
infinity: true)
)
Par défaut, les valeurs sont cumulatives dans la colonne _value
, appliquer la fonction difference
pour remplacer le calcul cumulatif.
|> histogram( bins: linearBins(start: 0.0, width: 10.0, count: 10) )
|> difference()
histogram histogram + difference
le _value le _value
----- ------ ----- ------
50 292 50 269
60 536 60 241
70 784 70 247
Colonnes calculées - Computed columns (map)
map
|> map(fn: (r) => ({ _value: r._value * 2.0 }))
L’utilisation de base de la fonction map
supprime les colonnes
qui ne font pas partie de la clé de groupe (_time
, _start
, _stop
…)
et qui ne sont pas explicitement "mappées",
utiliser l’opérateur with
pour empêcher leur suppression :
|> map(fn: (r) => ({ r with _value: r._value * 2.0 }))
L’opérateur with
met à jour la colonne si elle existe déjà, créé une nouvelle colonne si elle n’existe pas,
et retourne toutes les colonnes existantes dans la table en sortie.
Fonctions d’agrégat personnalisées (reduce)
reduce
|> reduce(fn: (r, accumulator) => ({
count: accumulator.count + 1,
total: accumulator.total + r._value,
avg: (accumulator.total + r._value) / float(v: accumulator.count)
}),
identity: {count: 1, total: 0.0, avg: 0.0}
)
identity
définit les valeurs initiales ainsi que le type de données.
Une ligne unique avec les agrégats est retournée sans colonne _time
.
Écrire des données
to
|> to(bucket:"history", org:"sqlpac")
Pour écrire dans le même bucket, utiliser auparavant la fonction set
pour définir le nom de la mesure :
|> set(key: "_measurement", value: "history_vpsmetrics")
|> to(bucket:"netdatatsdb/autogen", org:"sqlpac")
Sources de données SQL
- Informations métadonnées (username, password…)
Utiliser influx secret
ou curl
pour stocker les métadonnées :
$ influx secret update --key PG_HOST --value vpsfrsqlpac
$ influx secret update --key PG_PORT --value 5432
$ influx secret update --key PG_USER --value influxdb
$ influx secret update --key PG_PASS --value "***********"
Extraire les "secrets" dans un script Flux à l’aide du package secret
(vérifier l’autorisation read:secrets
) :
import "influxdata/influxdb/secrets"
PG_HOST = secrets.get(key: "PG_HOST")
PG_USER = secrets.get(key: "PG_USER")
PG_PASS = secrets.get(key: "PG_PASS")
PG_PORT = secrets.get(key: "PG_PORT")
sql.from
: extraire les données
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
datavps = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
query: "SELECT name, totalmemory FROM vps"
)
Drivers disponibles avec InfluxDB v2.0.4 (plus à venir dans de futures releases) :
- awsathena
- bigquery
- mysql
- postgres
- snowflake
- sqlserver, mssql
- Joindre les données
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
datavps = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
query: "SELECT name, totalmemory FROM vps"
)
|> rename(columns : {name: "host"})
datamem = from(bucket: "netdatatsdb/autogen")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "vpsmetrics"
and r._field == "mem"
and r.host == "vpsfrsqlpac1")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
join(
tables: {vps:datavps, mem:datamem},
on: ["host"],
method: "inner"
)
|> map(fn: (r) => ({ r with _value: (r._value / r.totalmemory) * 100.0 }))
sql.to
: writing data
Vérifier la structure de la table recevant les données : types de données, noms des colonnes, null
/not null
.
import "sql"
import "influxdata/influxdb/secrets"
// Get secrets…
from(bucket: "netdatatsdb/autogen")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "vps_pmem"
and r._field == "pmem"
and r.host == "vpsfrsqlpac2")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> rename(columns: {_value: "pmem", _time: "dth"})
|> keep(columns: ["host", "dth", "pmem"])
|> sql.to(
driverName: "postgres",
dataSourceName: "postgresql://${PG_USER}:${PG_PASS}@${PG_HOST}?port=${PG_PORT}&sslmode=disable",
table: "vpspmem"
)