Introduction
Network anomalies can indicate various issues, from performance bottlenecks and misconfigurations to security threats. Manually monitoring network logs for these anomalies is time-consuming and often ineffective. This article demonstrates how to automate network anomaly detection using Grafana Loki for log aggregation and a simple machine learning (ML) model for anomaly scoring. We’ll focus on detecting unusual traffic patterns based on log data.
Prerequisites
Before you begin, ensure you have the following:
- Grafana: A monitoring and observability platform.
- Grafana Loki: A log aggregation system.
- Python 3.6+: With libraries like
pandas,scikit-learn, andrequests. - Network device logs: Syslog or other formats containing network traffic information.
Architecture Overview
The system works as follows:
- Network Devices: Generate logs containing network traffic data (e.g., source IP, destination IP, bytes transferred).
- Loki: Collects and stores these logs.
- Python Script (Anomaly Detector): Queries Loki for log data, preprocesses the data, applies a machine learning model to score anomalies, and sends alerts to a notification system (e.g., Grafana Alerting, Slack).
graph LR
A[Network Devices] --> B(Loki);
B --> C{Python Script (Anomaly Detector)};
C --> D[Notification System];
Step 1: Configuring Network Devices to Send Logs to Loki
Configure your network devices (routers, switches, firewalls) to forward their logs to Loki. The specific configuration depends on your devices and Loki setup. Here’s a general example using rsyslog:
*.* @loki-server:514
Replace loki-server with the hostname or IP address of your Loki server. Ensure Loki is configured to receive these logs.
Step 2: Setting up Grafana Loki
Loki needs to be configured to receive and store the network logs. A basic loki.yml configuration file might look like this:
auth_enabled: false
server:
http_listen_port: 3100
ingester:
lifecycler:
address: 127.0.0.1
ring:
kvstore:
store: inmemory
replication_factor: 1
chunk_idle_period: 5m
chunk_retain_period: 30m
schema_config:
configs:
- from: 2020-10-15
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
storage_config:
boltdb_shipper:
active_index_directory: /tmp/loki/index
shared_store: filesystem
index_gateway_client:
server_address: 127.0.0.1:9095
filesystem:
path: /tmp/loki/chunks
This configuration stores logs in the /tmp/loki directory. Adjust the paths and retention periods according to your needs. For production environments, consider using a more robust storage backend like S3 or GCS.
Step 3: Writing the Anomaly Detection Script (Python)
This script will:
- Query Loki for network log data.
- Preprocess the data to extract relevant features.
- Train a simple machine learning model (e.g., Isolation Forest) on historical data.
- Score new data for anomalies.
- Send alerts if the anomaly score exceeds a threshold.
import pandas as pd
import requests
from sklearn.ensemble import IsolationForest
from datetime import datetime, timedelta
# Loki Configuration
LOKI_URL = "http://localhost:3100"
LOKI_QUERY = '{job="network-logs"} |= "src_ip" |= "dst_ip" |= "bytes"'
QUERY_RANGE = "5m" # Query the last 5 minutes of logs
ANOMALY_THRESHOLD = -0.1 # Adjust based on your data
def query_loki(query, range):
"""Queries Loki for log data."""
end = datetime.utcnow()
start = end - timedelta(minutes=int(range[:-1]))
params = {
'query': query,
'start': start.isoformat() + 'Z',
'end': end.isoformat() + 'Z',
}
response = requests.get(f"{LOKI_URL}/loki/api/v1/query_range", params=params)
response.raise_for_status()
return response.json()
def preprocess_data(data):
"""Preprocesses log data to extract features."""
df = pd.DataFrame(columns=['src_ip', 'dst_ip', 'bytes'])
for result in data['data']['result']:
for value in result['values']:
log_line = value[1]
try:
src_ip = log_line.split("src_ip=")[1].split(",")[0].strip()
dst_ip = log_line.split("dst_ip=")[1].split(",")[0].strip()
bytes_transferred = int(log_line.split("bytes=")[1].split(",")[0].strip())
df = pd.concat([df, pd.DataFrame([{'src_ip': src_ip, 'dst_ip': dst_ip, 'bytes': bytes_transferred}])], ignore_index=True)
except IndexError:
print(f"Skipping malformed log line: {log_line}")
continue
# Simple example: aggregate bytes by destination IP
df_grouped = df.groupby('dst_ip')['bytes'].sum().reset_index()
return df_grouped[['bytes']] # Return only the 'bytes' column for anomaly detection
def train_model(data):
"""Trains an Isolation Forest model."""
model = IsolationForest(n_estimators=100, contamination='auto', random_state=42)
model.fit(data)
return model
def detect_anomalies(model, data):
"""Detects anomalies using the trained model."""
scores = model.decision_function(data)
return scores
def send_alert(message):
"""Sends an alert to a notification system (e.g., Slack, Grafana Alerting)."""
print(f"ALERT: {message}") # Replace with your actual alerting mechanism
def main():
"""Main function to orchestrate the anomaly detection process."""
try:
# 1. Query Loki
log_data = query_loki(LOKI_QUERY, QUERY_RANGE)
# 2. Preprocess Data
if not log_data['data']['result']:
print("No data found in Loki for the specified query and range.")
return
preprocessed_data = preprocess_data(log_data)
if preprocessed_data.empty:
print("No valid data after preprocessing.")
return
# 3. Train Model (using historical data - replace with your data loading)
# For this example, we'll train on the current data (not ideal for production)
model = train_model(preprocessed_data)
# 4. Detect Anomalies
anomaly_scores = detect_anomalies(model, preprocessed_data)
# 5. Alerting
for i, score in enumerate(anomaly_scores):
if score < ANOMALY_THRESHOLD:
print(f"Anomaly detected: Score = {score}")
# Assuming preprocess_data returns the 'bytes' column and groups by 'dst_ip'
dst_ip = preprocessed_data.index[i] # Get the corresponding dst_ip
bytes_transferred = preprocessed_data['bytes'].iloc[i]
message = f"Possible network anomaly: High traffic to destination IP {dst_ip} ({bytes_transferred} bytes) - Anomaly Score: {score}"
send_alert(message)
except requests.exceptions.RequestException as e:
print(f"Error querying Loki: {e}")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()
Explanation:
query_loki(): Queries Loki using the specified query
