pushing metrics with writeapi

go
Wrap Lines
Raw
package main

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"log"
	"math/rand"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/gogo/protobuf/proto" // Using gogo/protobuf for compatibility with prompb
	"github.com/golang/snappy"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
	"github.com/prometheus/common/model"
	"github.com/prometheus/prometheus/prompb"
)

// --- Configuration ---
// Use environment variables for sensitive data
const (
	mimirPushURL    = "https://<your-mimir-distributor-url>/api/v1/push"
	mimirUsername   = "your-tenant-id" // Replace or use env var
	mimirPassword   = "your-api-key"   // Replace or use env var
	pushInterval    = 15 * time.Second
)

// MimirPusher is responsible for pushing metrics to Mimir
type MimirPusher struct {
	registry *prometheus.Registry
	client   *http.Client
}

// NewMimirPusher creates a new pusher
func NewMimirPusher(registry *prometheus.Registry) *MimirPusher {
	return &MimirPusher{
		registry: registry,
		client: &http.Client{
			Timeout: 10 * time.Second,
		},
	}
}

// Start begins the periodic push of metrics.
// It stops when the provided context is canceled.
func (p *MimirPusher) Start(ctx context.Context) {
	ticker := time.NewTicker(pushInterval)
	defer ticker.Stop()

	log.Println("Starting Mimir pusher...")
	for {
		select {
		case <-ticker.C:
			if err := p.pushMetrics(); err != nil {
				log.Printf("ERROR: Failed to push metrics: %v", err)
			} else {
				log.Println("Successfully pushed metrics to Mimir.")
			}
		case <-ctx.Done():
			log.Println("Stopping Mimir pusher...")
			// Final push on shutdown
			if err := p.pushMetrics(); err != nil {
				log.Printf("ERROR: Failed to push final metrics: %v", err)
			}
			return
		}
	}
}

// pushMetrics gathers, transforms, and sends metrics to Mimir.
func (p *MimirPusher) pushMetrics() error {
	// 1. Gather metrics from the registry
	metricFamilies, err := p.registry.Gather()
	if err != nil {
		return fmt.Errorf("could not gather metrics: %w", err)
	}

	now := time.Now().UnixMilli()
	writeRequest := &prompb.WriteRequest{}

	// 2. Transform metrics to prompb.TimeSeries
	for _, mf := range metricFamilies {
		for _, m := range mf.GetMetric() {
			labels := make([]prompb.Label, 0, len(m.GetLabel())+1)
			labels = append(labels, prompb.Label{
				Name:  model.MetricNameLabel,
				Value: mf.GetName(),
			})
			for _, l := range m.GetLabel() {
				labels = append(labels, prompb.Label{
					Name:  l.GetName(),
					Value: l.GetValue(),
				})
			}

			var value float64
			if m.GetGauge() != nil {
				value = m.GetGauge().GetValue()
			} else if m.GetCounter() != nil {
				value = m.GetCounter().GetValue()
			} else {
				// Skipping other types (Summary, Histogram) for simplicity
				continue
			}

			ts := prompb.TimeSeries{
				Labels: labels,
				Samples: []prompb.Sample{
					{
						Value:     value,
						Timestamp: now,
					},
				},
			}
			writeRequest.Timeseries = append(writeRequest.Timeseries, ts)
		}
	}

	// 3. Marshal to protobuf and compress with Snappy
	data, err := proto.Marshal(writeRequest)
	if err != nil {
		return fmt.Errorf("could not marshal protobuf: %w", err)
	}
	compressedData := snappy.Encode(nil, data)

	// 4. Create and send the HTTP request
	req, err := http.NewRequest("POST", mimirPushURL, bytes.NewReader(compressedData))
	if err != nil {
		return fmt.Errorf("could not create http request: %w", err)
	}

	// Set necessary headers
	req.Header.Set("Content-Type", "application/x-protobuf")
	req.Header.Set("Content-Encoding", "snappy")
	req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")

	// Add authentication
	req.SetBasicAuth(mimirUsername, mimirPassword)

	resp, err := p.client.Do(req)
	if err != nil {
		return fmt.Errorf("http request failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		body, _ := io.ReadAll(resp.Body)
		return fmt.Errorf("received non-2xx response: %s - %s", resp.Status, string(body))
	}

	return nil
}

func main() {
	// Use a custom registry instead of the global one. This is best practice.
	registry := prometheus.NewRegistry()

	// Instrument your application with some metrics
	opsProcessed := promauto.With(registry).NewCounter(prometheus.CounterOpts{
		Name: "myapp_processed_ops_total",
		Help: "The total number of processed operations.",
	})

	temperature := promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{
		Name: "myapp_temperature_celsius",
		Help: "The current temperature of a sensor.",
	}, []string{"sensor_id"})

	// Simulate work in a separate goroutine
	go func() {
		for {
			opsProcessed.Inc()
			temperature.WithLabelValues("A1").Set(rand.Float64()*10 + 15) // Temp between 15-25
			temperature.WithLabelValues("B2").Set(rand.Float64()*15 + 20) // Temp between 20-35
			time.Sleep(2 * time.Second)
		}
	}()

	// Create and start the Mimir pusher
	pusher := NewMimirPusher(registry)

	// Set up graceful shutdown
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	go pusher.Start(ctx)

	<-ctx.Done() // Wait for shutdown signal
	log.Println("Application shutting down.")
}