Source code for tempdataset.core.datasets.server_metrics

"""
Server metrics dataset generator.

Generates realistic server performance metrics.
"""

import random
from datetime import datetime, timedelta
from typing import List, Dict, Any

from .base import BaseDataset
from ..utils.faker_utils import get_faker_utils


[docs] class ServerMetricsDataset(BaseDataset): """Server metrics dataset generator for system monitoring and performance analysis.""" def __init__(self, rows: int = 500): super().__init__(rows) self.faker_utils = get_faker_utils() self._init_data_lists() self._metric_counter = 1 def _init_data_lists(self) -> None: self.server_names = [ 'web-01', 'web-02', 'web-03', 'api-01', 'api-02', 'db-primary', 'db-replica', 'cache-01', 'cache-02', 'lb-01', 'worker-01', 'worker-02', 'monitor-01', 'backup-01' ] self.metric_types = [ 'cpu_usage', 'memory_usage', 'disk_usage', 'network_io', 'disk_io', 'load_average', 'connection_count', 'response_time' ] self.server_environments = ['production', 'staging', 'development'] self.datacenter_locations = [ 'us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1', 'us-central-1', 'eu-central-1' ]
[docs] def generate(self) -> List[Dict[str, Any]]: if self.seed is not None: random.seed(self.seed) self.faker_utils.set_seed(self.seed) return [self._generate_row() for _ in range(self.rows)]
def _generate_row(self) -> Dict[str, Any]: # Basic metric info metric_id = f"METRIC-2025-{self._metric_counter:08d}" self._metric_counter += 1 # Timestamp - metrics are recent and frequent timestamp = self.faker_utils.date_between( datetime.now() - timedelta(days=1), datetime.now() ) timestamp = datetime.combine( timestamp, datetime.min.time().replace( hour=random.randint(0, 23), minute=random.randint(0, 59), second=random.randint(0, 59) ) ) # Server info server_name = random.choice(self.server_names) environment = random.choice(self.server_environments) datacenter = random.choice(self.datacenter_locations) # CPU metrics cpu_usage = round(random.uniform(10.0, 95.0), 2) cpu_cores = random.choice([2, 4, 8, 16, 32]) # Memory metrics (in GB) memory_total_gb = random.choice([8, 16, 32, 64, 128]) memory_used_gb = round(memory_total_gb * random.uniform(0.2, 0.9), 2) memory_usage_percent = round((memory_used_gb / memory_total_gb) * 100, 2) # Disk metrics (in GB) disk_total_gb = random.choice([100, 250, 500, 1000, 2000]) disk_used_gb = round(disk_total_gb * random.uniform(0.1, 0.8), 2) disk_usage_percent = round((disk_used_gb / disk_total_gb) * 100, 2) # Network metrics (in MB/s) network_in_mbps = round(random.uniform(0.1, 100.0), 2) network_out_mbps = round(random.uniform(0.1, 100.0), 2) # Load average (typical Unix load averages) load_average_1min = round(random.uniform(0.1, cpu_cores * 1.5), 2) load_average_5min = round(random.uniform(0.1, cpu_cores * 1.2), 2) load_average_15min = round(random.uniform(0.1, cpu_cores * 1.0), 2) # Connection metrics active_connections = random.randint(10, 1000) # Temperature (in Celsius) - higher for high CPU usage if cpu_usage > 80: temperature_celsius = round(random.uniform(65.0, 85.0), 1) elif cpu_usage > 60: temperature_celsius = round(random.uniform(50.0, 70.0), 1) else: temperature_celsius = round(random.uniform(35.0, 55.0), 1) # Uptime in hours uptime_hours = random.randint(1, 8760) # Up to 1 year # Status based on various metrics if (cpu_usage > 90 or memory_usage_percent > 95 or disk_usage_percent > 95 or temperature_celsius > 80): status = 'critical' elif (cpu_usage > 80 or memory_usage_percent > 85 or disk_usage_percent > 85 or temperature_celsius > 70): status = 'warning' else: status = 'healthy' return { 'metric_id': metric_id, 'timestamp': timestamp.strftime('%Y-%m-%d %H:%M:%S'), 'server_name': server_name, 'environment': environment, 'datacenter': datacenter, 'cpu_usage_percent': cpu_usage, 'cpu_cores': cpu_cores, 'memory_total_gb': memory_total_gb, 'memory_used_gb': memory_used_gb, 'memory_usage_percent': memory_usage_percent, 'disk_total_gb': disk_total_gb, 'disk_used_gb': disk_used_gb, 'disk_usage_percent': disk_usage_percent, 'network_in_mbps': network_in_mbps, 'network_out_mbps': network_out_mbps, 'load_average_1min': load_average_1min, 'load_average_5min': load_average_5min, 'load_average_15min': load_average_15min, 'active_connections': active_connections, 'temperature_celsius': temperature_celsius, 'uptime_hours': uptime_hours, 'status': status }
[docs] def get_schema(self) -> Dict[str, str]: return { 'metric_id': 'string', 'timestamp': 'datetime', 'server_name': 'string', 'environment': 'string', 'datacenter': 'string', 'cpu_usage_percent': 'float', 'cpu_cores': 'integer', 'memory_total_gb': 'integer', 'memory_used_gb': 'float', 'memory_usage_percent': 'float', 'disk_total_gb': 'integer', 'disk_used_gb': 'float', 'disk_usage_percent': 'float', 'network_in_mbps': 'float', 'network_out_mbps': 'float', 'load_average_1min': 'float', 'load_average_5min': 'float', 'load_average_15min': 'float', 'active_connections': 'integer', 'temperature_celsius': 'float', 'uptime_hours': 'integer', 'status': 'string' }