750 words
4 minutes
Automating Network Anomaly Detection with Grafana Loki and Machine Learning

Introduction#

Network anomalies can indicate various issues, from performance bottlenecks and misconfigurations to security threats. Manually monitoring network logs for these anomalies is time-consuming and often ineffective. This article demonstrates how to automate network anomaly detection using Grafana Loki for log aggregation and a simple machine learning (ML) model for anomaly scoring. We’ll focus on detecting unusual traffic patterns based on log data.

Prerequisites#

Before you begin, ensure you have the following:

  • Grafana: A monitoring and observability platform.
  • Grafana Loki: A log aggregation system.
  • Python 3.6+: With libraries like pandas, scikit-learn, and requests.
  • Network device logs: Syslog or other formats containing network traffic information.

Architecture Overview#

The system works as follows:

  1. Network Devices: Generate logs containing network traffic data (e.g., source IP, destination IP, bytes transferred).
  2. Loki: Collects and stores these logs.
  3. Python Script (Anomaly Detector): Queries Loki for log data, preprocesses the data, applies a machine learning model to score anomalies, and sends alerts to a notification system (e.g., Grafana Alerting, Slack).
graph LR
    A[Network Devices] --> B(Loki);
    B --> C{Python Script (Anomaly Detector)};
    C --> D[Notification System];

Step 1: Configuring Network Devices to Send Logs to Loki#

Configure your network devices (routers, switches, firewalls) to forward their logs to Loki. The specific configuration depends on your devices and Loki setup. Here’s a general example using rsyslog:

*.* @loki-server:514

Replace loki-server with the hostname or IP address of your Loki server. Ensure Loki is configured to receive these logs.

Step 2: Setting up Grafana Loki#

Loki needs to be configured to receive and store the network logs. A basic loki.yml configuration file might look like this:

auth_enabled: false

server:
  http_listen_port: 3100

ingester:
  lifecycler:
    address: 127.0.0.1
    ring:
      kvstore:
        store: inmemory
      replication_factor: 1
  chunk_idle_period: 5m
  chunk_retain_period: 30m

schema_config:
  configs:
    - from: 2020-10-15
      store: boltdb-shipper
      object_store: filesystem
      schema: v11
      index:
        prefix: index_
        period: 24h

storage_config:
  boltdb_shipper:
    active_index_directory: /tmp/loki/index
    shared_store: filesystem
    index_gateway_client:
      server_address: 127.0.0.1:9095
  filesystem:
    path: /tmp/loki/chunks

This configuration stores logs in the /tmp/loki directory. Adjust the paths and retention periods according to your needs. For production environments, consider using a more robust storage backend like S3 or GCS.

Step 3: Writing the Anomaly Detection Script (Python)#

This script will:

  1. Query Loki for network log data.
  2. Preprocess the data to extract relevant features.
  3. Train a simple machine learning model (e.g., Isolation Forest) on historical data.
  4. Score new data for anomalies.
  5. Send alerts if the anomaly score exceeds a threshold.
import pandas as pd
import requests
from sklearn.ensemble import IsolationForest
from datetime import datetime, timedelta

# Loki Configuration
LOKI_URL = "http://localhost:3100"
LOKI_QUERY = '{job="network-logs"} |= "src_ip" |= "dst_ip" |= "bytes"'
QUERY_RANGE = "5m"  # Query the last 5 minutes of logs
ANOMALY_THRESHOLD = -0.1  # Adjust based on your data

def query_loki(query, range):
    """Queries Loki for log data."""
    end = datetime.utcnow()
    start = end - timedelta(minutes=int(range[:-1]))
    params = {
        'query': query,
        'start': start.isoformat() + 'Z',
        'end': end.isoformat() + 'Z',
    }
    response = requests.get(f"{LOKI_URL}/loki/api/v1/query_range", params=params)
    response.raise_for_status()
    return response.json()

def preprocess_data(data):
    """Preprocesses log data to extract features."""
    df = pd.DataFrame(columns=['src_ip', 'dst_ip', 'bytes'])
    for result in data['data']['result']:
        for value in result['values']:
            log_line = value[1]
            try:
                src_ip = log_line.split("src_ip=")[1].split(",")[0].strip()
                dst_ip = log_line.split("dst_ip=")[1].split(",")[0].strip()
                bytes_transferred = int(log_line.split("bytes=")[1].split(",")[0].strip())
                df = pd.concat([df, pd.DataFrame([{'src_ip': src_ip, 'dst_ip': dst_ip, 'bytes': bytes_transferred}])], ignore_index=True)
            except IndexError:
                print(f"Skipping malformed log line: {log_line}")
                continue

    #  Simple example:  aggregate bytes by destination IP
    df_grouped = df.groupby('dst_ip')['bytes'].sum().reset_index()
    return df_grouped[['bytes']]  # Return only the 'bytes' column for anomaly detection


def train_model(data):
    """Trains an Isolation Forest model."""
    model = IsolationForest(n_estimators=100, contamination='auto', random_state=42)
    model.fit(data)
    return model

def detect_anomalies(model, data):
    """Detects anomalies using the trained model."""
    scores = model.decision_function(data)
    return scores

def send_alert(message):
    """Sends an alert to a notification system (e.g., Slack, Grafana Alerting)."""
    print(f"ALERT: {message}") # Replace with your actual alerting mechanism

def main():
    """Main function to orchestrate the anomaly detection process."""
    try:
        # 1. Query Loki
        log_data = query_loki(LOKI_QUERY, QUERY_RANGE)

        # 2. Preprocess Data
        if not log_data['data']['result']:
            print("No data found in Loki for the specified query and range.")
            return

        preprocessed_data = preprocess_data(log_data)

        if preprocessed_data.empty:
            print("No valid data after preprocessing.")
            return

        # 3. Train Model (using historical data - replace with your data loading)
        # For this example, we'll train on the current data (not ideal for production)
        model = train_model(preprocessed_data)

        # 4. Detect Anomalies
        anomaly_scores = detect_anomalies(model, preprocessed_data)

        # 5. Alerting
        for i, score in enumerate(anomaly_scores):
            if score < ANOMALY_THRESHOLD:
                print(f"Anomaly detected: Score = {score}")
                # Assuming preprocess_data returns the 'bytes' column and groups by 'dst_ip'
                dst_ip = preprocessed_data.index[i]  # Get the corresponding dst_ip
                bytes_transferred = preprocessed_data['bytes'].iloc[i]
                message = f"Possible network anomaly: High traffic to destination IP {dst_ip} ({bytes_transferred} bytes) - Anomaly Score: {score}"
                send_alert(message)

    except requests.exceptions.RequestException as e:
        print(f"Error querying Loki: {e}")
    except Exception as e:
        print(f"An error occurred: {e}")


if __name__ == "__main__":
    main()

Explanation:

  • query_loki(): Queries Loki using the specified query
Automating Network Anomaly Detection with Grafana Loki and Machine Learning
https://en.dymripper.com/posts/2025-06-01-automating-network-anomaly-detection-with-grafana-loki-and-machine-learning/
Author
DYMripper
Published at
2025-06-01