Source code for tempdataset.core.datasets.marketing

"""
Marketing dataset generator.

Generates realistic marketing campaign data with 36 columns including campaign information,
performance metrics, audience details, and financial calculations.
"""

import random
import string
from datetime import datetime, timedelta
from typing import List, Dict, Any

from .base import BaseDataset
from ..utils.faker_utils import get_faker_utils


[docs] class MarketingDataset(BaseDataset): """ Marketing dataset generator that creates realistic marketing campaign data. Generates 36 columns of marketing data including: - Campaign information (campaign_id, name, dates, status) - Channel and platform details (channel, platform, creative information) - Audience information (target_audience, audience_size, demographics) - Financial data (budget, spend, revenue, costs) - Performance metrics (impressions, clicks, conversions, rates) - Geographic data (region, country) - Engagement metrics (likes, comments, shares) - Management information (agency, manager) """ def __init__(self, rows: int = 500): """ Initialize the MarketingDataset generator. Args: rows: Number of rows to generate (default: 500) """ super().__init__(rows) self.faker_utils = get_faker_utils() # Initialize data for consistent generation self._init_data_lists() # Counters for sequential IDs self._campaign_counter = 1 self._creative_counter = 1 def _init_data_lists(self) -> None: """Initialize predefined data lists for realistic generation.""" # Marketing channels and their corresponding platforms self.channel_platforms = { 'Email': ['Mailchimp', 'SendGrid', 'Constant Contact', 'Campaign Monitor', 'ConvertKit', 'AWeber'], 'Social Media': ['Facebook', 'Instagram', 'Twitter', 'LinkedIn', 'TikTok', 'Snapchat', 'Pinterest'], 'Search Engine': ['Google Ads', 'Bing Ads', 'Yahoo Ads', 'DuckDuckGo Ads'], 'TV': ['National TV', 'Cable TV', 'Streaming TV', 'Local TV'], 'Radio': ['AM Radio', 'FM Radio', 'Satellite Radio', 'Podcast Ads'], 'Print': ['Newspaper', 'Magazine', 'Direct Mail', 'Billboard'], 'Outdoor': ['Billboard', 'Transit Ads', 'Street Furniture', 'Digital Signage'] } # Campaign types and names self.campaign_types = [ 'Brand Awareness', 'Product Launch', 'Lead Generation', 'Sales Promotion', 'Customer Retention', 'Event Promotion', 'Holiday Campaign', 'Back to School', 'Black Friday', 'Summer Sale', 'New Year', 'Valentine\'s Day' ] self.campaign_name_templates = [ '{type} Q{quarter} {year}', '{type} - {season} {year}', '{product} Launch Campaign', '{type} {month} {year}', 'Holiday {type} Campaign', '{brand} {type} Initiative' ] # Target audiences self.age_groups = ['18-25', '26-35', '36-45', '46-55', '55-65', '65+'] self.demographics = ['Urban', 'Suburban', 'Rural'] self.interests = [ 'Tech-savvy', 'Fitness Enthusiasts', 'Fashion Lovers', 'Food & Beverage', 'Travel Enthusiasts', 'Sports Fans', 'Music Lovers', 'Gaming Community', 'Health Conscious', 'Eco-Friendly', 'Luxury Seekers', 'Budget Conscious' ] # Geographic regions and countries self.regions = ['North America', 'Europe', 'Asia-Pacific', 'Latin America', 'Middle East & Africa'] self.region_countries = { 'North America': ['United States', 'Canada', 'Mexico'], 'Europe': ['United Kingdom', 'Germany', 'France', 'Italy', 'Spain', 'Netherlands'], 'Asia-Pacific': ['Japan', 'Australia', 'South Korea', 'Singapore', 'India', 'China'], 'Latin America': ['Brazil', 'Argentina', 'Mexico', 'Chile', 'Colombia'], 'Middle East & Africa': ['UAE', 'Saudi Arabia', 'South Africa', 'Egypt', 'Israel'] } # Campaign statuses self.campaign_statuses = ['Planned', 'Active', 'Completed', 'Paused', 'Cancelled'] # Creative types self.creative_types = ['Image', 'Video', 'Carousel', 'Text', 'Mixed'] # Marketing agencies self.agencies = [ 'Creative Solutions Inc', 'Digital Marketing Pro', 'Brand Builders LLC', 'Marketing Mavericks', 'Strategic Advertising Co', 'Innovation Marketing Group', 'Global Campaigns Ltd', 'Performance Marketing Hub', 'Creative Edge Agency', 'Digital Dynamics', 'Marketing Masters', 'Brand Vision Studios' ] # Manager names (using common marketing names) self.managers = [ 'Sarah Marketing', 'John Campaign', 'Lisa Strategy', 'Mike Creative', 'Jennifer Digital', 'Robert Analytics', 'Amanda Brand', 'Chris Performance', 'Michelle Growth', 'David Social', 'Emily Content', 'James Acquisition' ] # Seasons and months self.seasons = ['Spring', 'Summer', 'Fall', 'Winter'] self.months = ['January', 'February', 'March', 'April', 'May', 'June', 'July', 'August', 'September', 'October', 'November', 'December'] # Product categories for campaign naming self.products = [ 'Smartphone', 'Laptop', 'Clothing Line', 'Skincare', 'Fitness App', 'Food Delivery', 'Streaming Service', 'Electric Vehicle', 'Travel Package', 'Home Appliance', 'Gaming Console', 'Fashion Brand' ] self.brands = [ 'TechNova', 'StyleMax', 'FitLife', 'FoodieExpress', 'StreamNow', 'EcoDrive', 'WanderLust', 'HomeComfort', 'GameZone', 'UrbanStyle' ]
[docs] def generate(self) -> List[Dict[str, Any]]: """ Generate marketing dataset rows. Returns: List of dictionaries representing marketing campaign rows """ if self.seed is not None: random.seed(self.seed) self.faker_utils.set_seed(self.seed) data = [] for i in range(self.rows): row = self._generate_row() data.append(row) return data
def _generate_row(self) -> Dict[str, Any]: """Generate a single marketing campaign row.""" # Generate campaign dates (within last 2 years to next 6 months) end_date = datetime.now() + timedelta(days=180) start_date = datetime.now() - timedelta(days=730) campaign_start = self.faker_utils.date_between(start_date, end_date) # Ensure we have a datetime object, not a date object if hasattr(campaign_start, 'date'): # It's a datetime object campaign_start_date = campaign_start.date() else: # It's a date object campaign_start_date = campaign_start # Convert to datetime for consistency in other operations campaign_start = datetime.combine(campaign_start, datetime.min.time()) # Campaign duration between 7 days to 90 days duration_days = random.randint(7, 90) campaign_end = campaign_start + timedelta(days=duration_days) # Generate channel and platform channel = random.choice(list(self.channel_platforms.keys())) platform = random.choice(self.channel_platforms[channel]) # Generate campaign name campaign_name = self._generate_campaign_name() # Generate target audience age_group = random.choice(self.age_groups) demographic = random.choice(self.demographics) interest = random.choice(self.interests) target_audience = f"{age_group}, {demographic}, {interest}" # Generate audience size based on channel audience_size = self._generate_audience_size(channel) # Generate budget and spend budget_usd = round(random.uniform(1000, 500000), 2) spend_usd = round(budget_usd * random.uniform(0.7, 1.0), 2) # 70-100% of budget # Generate performance metrics impressions = self._generate_impressions(channel, spend_usd) clicks = self._generate_clicks(impressions) conversions = self._generate_conversions(clicks) # Calculate rates click_through_rate = round((clicks / impressions * 100) if impressions > 0 else 0, 2) conversion_rate = round((conversions / clicks * 100) if clicks > 0 else 0, 2) # Calculate costs cost_per_click = round((spend_usd / clicks) if clicks > 0 else 0, 2) cost_per_conversion = round((spend_usd / conversions) if conversions > 0 else 0, 2) # Generate revenue and ROI revenue_usd = round(conversions * random.uniform(50, 500), 2) # $50-500 per conversion roi_percentage = round(((revenue_usd - spend_usd) / spend_usd * 100) if spend_usd > 0 else 0, 2) # Generate geographic information region = random.choice(self.regions) country = random.choice(self.region_countries[region]) # Generate status based on dates current_date = datetime.now().date() if campaign_start_date > current_date: status = 'Planned' elif campaign_start_date <= current_date <= campaign_end.date(): status = random.choice(['Active', 'Paused']) else: status = random.choice(['Completed', 'Cancelled']) # Generate creative information creative_type = random.choice(self.creative_types) creative_name = self._generate_creative_name(creative_type) # Generate engagement metrics likes, comments, shares = self._generate_engagement_metrics(impressions, channel) engagement_rate = round(((likes + comments + shares) / impressions * 100) if impressions > 0 else 0, 2) # Generate additional metrics bounce_rate = round(random.uniform(20, 80), 2) # 20-80% avg_session_duration = round(random.uniform(30, 600), 2) # 30 seconds to 10 minutes # Generate lead metrics lead_count = random.randint(0, conversions * 3) # 0 to 3x conversions cost_per_lead = round((spend_usd / lead_count) if lead_count > 0 else 0, 2) # Generate ad frequency ad_frequency = round(random.uniform(1.0, 8.0), 2) # Generate agency and manager agency_name = random.choice(self.agencies) manager_name = random.choice(self.managers) return { 'campaign_id': self._generate_campaign_id(campaign_start), 'campaign_name': campaign_name, 'start_date': campaign_start.strftime('%Y-%m-%d'), 'end_date': campaign_end.strftime('%Y-%m-%d'), 'channel': channel, 'platform': platform, 'target_audience': target_audience, 'budget_usd': budget_usd, 'spend_usd': spend_usd, 'impressions': impressions, 'clicks': clicks, 'click_through_rate': click_through_rate, 'conversions': conversions, 'conversion_rate': conversion_rate, 'cost_per_click': cost_per_click, 'cost_per_conversion': cost_per_conversion, 'revenue_usd': revenue_usd, 'roi_percentage': roi_percentage, 'region': region, 'country': country, 'status': status, 'creative_type': creative_type, 'audience_size': audience_size, 'bounce_rate': bounce_rate, 'avg_session_duration': avg_session_duration, 'lead_count': lead_count, 'cost_per_lead': cost_per_lead, 'engagement_rate': engagement_rate, 'likes': likes, 'comments': comments, 'shares': shares, 'ad_frequency': ad_frequency, 'creative_id': self._generate_creative_id(), 'creative_name': creative_name, 'agency_name': agency_name, 'manager_name': manager_name } def _generate_campaign_id(self, campaign_date: datetime) -> str: """ Generate campaign ID in format "CMP-YYYY-NNNNNN". Args: campaign_date: Date of the campaign Returns: Formatted campaign ID """ year = campaign_date.year campaign_num = str(self._campaign_counter).zfill(6) self._campaign_counter += 1 return f"CMP-{year}-{campaign_num:0>6}" def _generate_creative_id(self) -> str: """ Generate creative ID in format "CRTV-NNNNNN". Returns: Formatted creative ID """ creative_num = str(self._creative_counter).zfill(6) self._creative_counter += 1 return f"CRTV-{creative_num:0>6}" def _generate_campaign_name(self) -> str: """ Generate realistic campaign name. Returns: Campaign name string """ template = random.choice(self.campaign_name_templates) replacements = { 'type': random.choice(self.campaign_types), 'quarter': random.randint(1, 4), 'year': random.randint(2023, 2025), 'season': random.choice(self.seasons), 'month': random.choice(self.months), 'product': random.choice(self.products), 'brand': random.choice(self.brands) } # Replace placeholders in template name = template for key, value in replacements.items(): name = name.replace(f'{{{key}}}', str(value)) return name def _generate_creative_name(self, creative_type: str) -> str: """ Generate creative name based on type. Args: creative_type: Type of creative Returns: Creative name string """ type_templates = { 'Image': ['Static Banner', 'Product Photo', 'Brand Logo', 'Lifestyle Image'], 'Video': ['Product Demo', 'Brand Story', 'Testimonial Video', 'How-to Video'], 'Carousel': ['Product Showcase', 'Step-by-step Guide', 'Feature Highlights'], 'Text': ['Compelling Copy', 'Call-to-Action Text', 'Product Description'], 'Mixed': ['Multi-format Campaign', 'Integrated Creative', 'Cross-platform Asset'] } base_name = random.choice(type_templates[creative_type]) version = random.choice(['V1', 'V2', 'V3', 'Final', 'A', 'B']) return f"{base_name} {version}" def _generate_audience_size(self, channel: str) -> int: """ Generate audience size based on channel. Args: channel: Marketing channel Returns: Audience size as integer """ size_ranges = { 'Email': (1000, 100000), 'Social Media': (5000, 2000000), 'Search Engine': (10000, 5000000), 'TV': (100000, 10000000), 'Radio': (50000, 2000000), 'Print': (20000, 500000), 'Outdoor': (100000, 3000000) } min_size, max_size = size_ranges.get(channel, (1000, 100000)) return random.randint(min_size, max_size) def _generate_impressions(self, channel: str, spend: float) -> int: """ Generate impressions based on channel and spend. Args: channel: Marketing channel spend: Campaign spend Returns: Number of impressions """ # Different channels have different cost per impression ranges cpm_ranges = { 'Email': (0.1, 1.0), # Very low CPM 'Social Media': (1.0, 15.0), 'Search Engine': (2.0, 20.0), 'TV': (5.0, 50.0), 'Radio': (3.0, 25.0), 'Print': (2.0, 30.0), 'Outdoor': (1.0, 10.0) } min_cpm, max_cpm = cpm_ranges.get(channel, (1.0, 10.0)) cpm = random.uniform(min_cpm, max_cpm) # Calculate impressions: spend / (cpm / 1000) impressions = int((spend / cpm) * 1000) # Add some randomness return random.randint(int(impressions * 0.8), int(impressions * 1.2)) def _generate_clicks(self, impressions: int) -> int: """ Generate clicks based on impressions. Args: impressions: Number of impressions Returns: Number of clicks """ # Typical CTR ranges from 0.1% to 8% ctr = random.uniform(0.001, 0.08) clicks = int(impressions * ctr) return max(0, clicks) # Ensure non-negative def _generate_conversions(self, clicks: int) -> int: """ Generate conversions based on clicks. Args: clicks: Number of clicks Returns: Number of conversions """ # Typical conversion rates range from 0.5% to 15% conversion_rate = random.uniform(0.005, 0.15) conversions = int(clicks * conversion_rate) return max(0, conversions) # Ensure non-negative def _generate_engagement_metrics(self, impressions: int, channel: str) -> tuple: """ Generate likes, comments, and shares based on impressions and channel. Args: impressions: Number of impressions channel: Marketing channel Returns: Tuple of (likes, comments, shares) """ # Engagement rates vary by channel if channel == 'Social Media': # Higher engagement for social media likes_rate = random.uniform(0.01, 0.05) # 1-5% comments_rate = random.uniform(0.001, 0.01) # 0.1-1% shares_rate = random.uniform(0.001, 0.005) # 0.1-0.5% elif channel in ['Email', 'Search Engine']: # Lower engagement for non-social channels likes_rate = random.uniform(0.001, 0.01) # 0.1-1% comments_rate = random.uniform(0.0001, 0.005) # 0.01-0.5% shares_rate = random.uniform(0.0001, 0.002) # 0.01-0.2% else: # Minimal engagement for traditional channels likes_rate = random.uniform(0.0001, 0.005) comments_rate = random.uniform(0.00001, 0.001) shares_rate = random.uniform(0.00001, 0.0005) likes = int(impressions * likes_rate) comments = int(impressions * comments_rate) shares = int(impressions * shares_rate) return max(0, likes), max(0, comments), max(0, shares)
[docs] def get_schema(self) -> Dict[str, str]: """ Return column schema with types. Returns: Dictionary mapping column names to their data types """ return { 'campaign_id': 'string', 'campaign_name': 'string', 'start_date': 'date', 'end_date': 'date', 'channel': 'string', 'platform': 'string', 'target_audience': 'string', 'budget_usd': 'float', 'spend_usd': 'float', 'impressions': 'integer', 'clicks': 'integer', 'click_through_rate': 'float', 'conversions': 'integer', 'conversion_rate': 'float', 'cost_per_click': 'float', 'cost_per_conversion': 'float', 'revenue_usd': 'float', 'roi_percentage': 'float', 'region': 'string', 'country': 'string', 'status': 'string', 'creative_type': 'string', 'audience_size': 'integer', 'bounce_rate': 'float', 'avg_session_duration': 'float', 'lead_count': 'integer', 'cost_per_lead': 'float', 'engagement_rate': 'float', 'likes': 'integer', 'comments': 'integer', 'shares': 'integer', 'ad_frequency': 'float', 'creative_id': 'string', 'creative_name': 'string', 'agency_name': 'string', 'manager_name': 'string' }