pushing metrics with writeapi
go
Wrap Lines
Raw package main
import (
"bytes"
"context"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/gogo/protobuf/proto" // Using gogo/protobuf for compatibility with prompb
"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)
// --- Configuration ---
// Use environment variables for sensitive data
const (
mimirPushURL = "https://<your-mimir-distributor-url>/api/v1/push"
mimirUsername = "your-tenant-id" // Replace or use env var
mimirPassword = "your-api-key" // Replace or use env var
pushInterval = 15 * time.Second
)
// MimirPusher is responsible for pushing metrics to Mimir
type MimirPusher struct {
registry *prometheus.Registry
client *http.Client
}
// NewMimirPusher creates a new pusher
func NewMimirPusher(registry *prometheus.Registry) *MimirPusher {
return &MimirPusher{
registry: registry,
client: &http.Client{
Timeout: 10 * time.Second,
},
}
}
// Start begins the periodic push of metrics.
// It stops when the provided context is canceled.
func (p *MimirPusher) Start(ctx context.Context) {
ticker := time.NewTicker(pushInterval)
defer ticker.Stop()
log.Println("Starting Mimir pusher...")
for {
select {
case <-ticker.C:
if err := p.pushMetrics(); err != nil {
log.Printf("ERROR: Failed to push metrics: %v", err)
} else {
log.Println("Successfully pushed metrics to Mimir.")
}
case <-ctx.Done():
log.Println("Stopping Mimir pusher...")
// Final push on shutdown
if err := p.pushMetrics(); err != nil {
log.Printf("ERROR: Failed to push final metrics: %v", err)
}
return
}
}
}
// pushMetrics gathers, transforms, and sends metrics to Mimir.
func (p *MimirPusher) pushMetrics() error {
// 1. Gather metrics from the registry
metricFamilies, err := p.registry.Gather()
if err != nil {
return fmt.Errorf("could not gather metrics: %w", err)
}
now := time.Now().UnixMilli()
writeRequest := &prompb.WriteRequest{}
// 2. Transform metrics to prompb.TimeSeries
for _, mf := range metricFamilies {
for _, m := range mf.GetMetric() {
labels := make([]prompb.Label, 0, len(m.GetLabel())+1)
labels = append(labels, prompb.Label{
Name: model.MetricNameLabel,
Value: mf.GetName(),
})
for _, l := range m.GetLabel() {
labels = append(labels, prompb.Label{
Name: l.GetName(),
Value: l.GetValue(),
})
}
var value float64
if m.GetGauge() != nil {
value = m.GetGauge().GetValue()
} else if m.GetCounter() != nil {
value = m.GetCounter().GetValue()
} else {
// Skipping other types (Summary, Histogram) for simplicity
continue
}
ts := prompb.TimeSeries{
Labels: labels,
Samples: []prompb.Sample{
{
Value: value,
Timestamp: now,
},
},
}
writeRequest.Timeseries = append(writeRequest.Timeseries, ts)
}
}
// 3. Marshal to protobuf and compress with Snappy
data, err := proto.Marshal(writeRequest)
if err != nil {
return fmt.Errorf("could not marshal protobuf: %w", err)
}
compressedData := snappy.Encode(nil, data)
// 4. Create and send the HTTP request
req, err := http.NewRequest("POST", mimirPushURL, bytes.NewReader(compressedData))
if err != nil {
return fmt.Errorf("could not create http request: %w", err)
}
// Set necessary headers
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
// Add authentication
req.SetBasicAuth(mimirUsername, mimirPassword)
resp, err := p.client.Do(req)
if err != nil {
return fmt.Errorf("http request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("received non-2xx response: %s - %s", resp.Status, string(body))
}
return nil
}
func main() {
// Use a custom registry instead of the global one. This is best practice.
registry := prometheus.NewRegistry()
// Instrument your application with some metrics
opsProcessed := promauto.With(registry).NewCounter(prometheus.CounterOpts{
Name: "myapp_processed_ops_total",
Help: "The total number of processed operations.",
})
temperature := promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
Name: "myapp_temperature_celsius",
Help: "The current temperature of a sensor.",
}, []string{"sensor_id"})
// Simulate work in a separate goroutine
go func() {
for {
opsProcessed.Inc()
temperature.WithLabelValues("A1").Set(rand.Float64()*10 + 15) // Temp between 15-25
temperature.WithLabelValues("B2").Set(rand.Float64()*15 + 20) // Temp between 20-35
time.Sleep(2 * time.Second)
}
}()
// Create and start the Mimir pusher
pusher := NewMimirPusher(registry)
// Set up graceful shutdown
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
go pusher.Start(ctx)
<-ctx.Done() // Wait for shutdown signal
log.Println("Application shutting down.")
}