"""
TempDataFrame class for data manipulation.
Provides a lightweight alternative to pandas DataFrame with essential data exploration methods.
"""
import csv
import json
import sys
from typing import List, Dict, Any, Tuple, Union
from ..exceptions import ValidationError, CSVWriteError, JSONWriteError
class ShapeDescriptor:
"""Custom descriptor that allows shape to work both as property and method."""
def __get__(self, obj, objtype=None):
if obj is None:
return self
# Create a callable object that returns the shape tuple
class ShapeCallable:
def __init__(self, data, columns):
self._data = data
self._columns = columns
self._shape = (len(data), len(columns))
def __call__(self):
"""Allow calling as method: df.shape()"""
return self._shape
def __iter__(self):
"""Allow tuple unpacking: rows, cols = df.shape"""
return iter(self._shape)
def __getitem__(self, index):
"""Allow indexing: df.shape[0], df.shape[1]"""
return self._shape[index]
def __eq__(self, other):
"""Allow comparison: df.shape == (10, 5)"""
return self._shape == other
def __repr__(self):
"""String representation"""
return repr(self._shape)
def __str__(self):
"""String representation"""
return str(self._shape)
def __len__(self):
"""Length (always 2 for shape tuple)"""
return 2
return ShapeCallable(obj._data, obj._columns)
class DisplayFormatter:
"""Helper class to format output for Jupyter/Colab display."""
def __init__(self, content: str):
self.content = content
def __str__(self) -> str:
return self.content
def __repr__(self) -> str:
return self.content
def __eq__(self, other) -> bool:
"""Enable comparison with strings for testing."""
if isinstance(other, str):
return self.content == other
elif isinstance(other, DisplayFormatter):
return self.content == other.content
return False
def __contains__(self, item) -> bool:
"""Enable 'in' operator for string searching."""
return item in self.content
def _repr_html_(self) -> str:
"""HTML representation for Jupyter notebooks."""
# Convert the text table to a simple HTML table for better display
lines = self.content.split('\n')
if not lines:
return f"<pre>{self.content}</pre>"
# For simple text formatting, just use <pre> tag to preserve spacing
return f"<pre style='font-family: monospace; white-space: pre;'>{self.content}</pre>"
[docs]
class TempDataFrame:
"""
Lightweight DataFrame-like class for data manipulation and exploration.
Provides essential methods for working with tabular data without pandas dependency.
"""
def __init__(self, data: List[Dict[str, Any]], columns: List[str]):
"""
Initialize TempDataFrame.
Args:
data: List of dictionaries representing rows
columns: List of column names
Raises:
ValidationError: If parameters are invalid
"""
# Validate input parameters
if not isinstance(data, list):
raise ValidationError("data", data, "list of dictionaries")
if not isinstance(columns, list):
raise ValidationError("columns", columns, "list of strings")
if not all(isinstance(col, str) for col in columns):
raise ValidationError("columns", columns, "list of strings")
# Validate that data contains dictionaries (if not empty)
if data and not all(isinstance(row, dict) for row in data):
raise ValidationError("data", data, "list of dictionaries")
self._data = data
self._columns = columns
def __len__(self) -> int:
"""
Return the number of rows in the DataFrame.
Returns:
Number of rows
"""
return len(self._data)
[docs]
def head(self, n: int = 5) -> DisplayFormatter:
"""
Display first n rows in a readable format.
Args:
n: Number of rows to display (default: 5)
Returns:
DisplayFormatter object with formatted representation of the first n rows
Raises:
ValidationError: If n is not a positive integer
"""
# Validate input parameter
if not isinstance(n, int):
raise ValidationError("n", n, "integer")
if n <= 0:
raise ValidationError("n", n, "positive integer")
if not self._data:
result = DisplayFormatter("Empty DataFrame")
else:
# Get the first n rows
rows_to_show = self._data[:n]
result = DisplayFormatter(self._format_rows(rows_to_show))
# Auto-print in script context
print(result.content)
return result
[docs]
def tail(self, n: int = 5) -> DisplayFormatter:
"""
Display last n rows in a readable format.
Args:
n: Number of rows to display (default: 5)
Returns:
DisplayFormatter object with formatted representation of the last n rows
Raises:
ValidationError: If n is not a positive integer
"""
# Validate input parameter
if not isinstance(n, int):
raise ValidationError("n", n, "integer")
if n <= 0:
raise ValidationError("n", n, "positive integer")
if not self._data:
result = DisplayFormatter("Empty DataFrame")
else:
# Get the last n rows
rows_to_show = self._data[-n:]
result = DisplayFormatter(self._format_rows(rows_to_show))
# Auto-print in script context
print(result.content)
return result
# Use custom descriptor for shape to support both property and method access
shape = ShapeDescriptor()
@property
def columns(self) -> List[str]:
"""
Return column names.
Returns:
List of column names
"""
return self._columns.copy()
[docs]
def describe(self) -> DisplayFormatter:
"""
Generate descriptive statistics for numeric columns.
Returns:
DisplayFormatter object with statistical summary
"""
if not self._data:
result = DisplayFormatter("Empty DataFrame")
else:
# Find numeric columns
numeric_cols = []
for col in self._columns:
# Check if column contains numeric data
has_numeric = False
for row in self._data:
value = row.get(col)
if value is not None and isinstance(value, (int, float)) and not isinstance(value, bool):
has_numeric = True
break
if has_numeric:
numeric_cols.append(col)
if not numeric_cols:
result = DisplayFormatter("No numeric columns found")
else:
# Calculate statistics for each numeric column
stats = {}
for col in numeric_cols:
values = []
for row in self._data:
value = row.get(col)
if value is not None and isinstance(value, (int, float)) and not isinstance(value, bool):
values.append(float(value))
if values:
values.sort()
n = len(values)
# Calculate statistics
count = n
mean = sum(values) / n
std = (sum((x - mean) ** 2 for x in values) / (n - 1)) ** 0.5 if n > 1 else 0.0
min_val = min(values)
max_val = max(values)
# Percentiles
q25_idx = int(n * 0.25)
q50_idx = int(n * 0.50)
q75_idx = int(n * 0.75)
q25 = values[q25_idx] if q25_idx < n else values[-1]
q50 = values[q50_idx] if q50_idx < n else values[-1]
q75 = values[q75_idx] if q75_idx < n else values[-1]
stats[col] = {
'count': count,
'mean': mean,
'std': std,
'min': min_val,
'25%': q25,
'50%': q50,
'75%': q75,
'max': max_val
}
if not stats:
result = DisplayFormatter("No numeric data found")
else:
# Format output similar to pandas describe()
stat_names = ['count', 'mean', 'std', 'min', '25%', '50%', '75%', 'max']
# Calculate column widths
col_widths = {}
for col in numeric_cols:
col_widths[col] = max(len(col), 10) # Minimum width of 10
# Create header
header_parts = [''.ljust(8)] # Space for stat names
for col in numeric_cols:
header_parts.append(col.rjust(col_widths[col]))
lines = [' '.join(header_parts)]
# Add statistics rows
for stat_name in stat_names:
row_parts = [stat_name.ljust(8)]
for col in numeric_cols:
if col in stats:
value = stats[col][stat_name]
if stat_name == 'count':
formatted_value = f"{int(value)}"
else:
formatted_value = f"{value:.6f}"
row_parts.append(formatted_value.rjust(col_widths[col]))
else:
row_parts.append(''.rjust(col_widths[col]))
lines.append(' '.join(row_parts))
result = DisplayFormatter('\n'.join(lines))
# Auto-print in script context
print(result.content)
return result
[docs]
def info(self) -> DisplayFormatter:
"""
Display dataset information including column types and memory usage.
Returns:
DisplayFormatter object with dataset information
"""
if not self._data:
result = DisplayFormatter("Empty DataFrame")
else:
rows, cols = self.shape
# Calculate column types and non-null counts
column_info = []
for col in self._columns:
non_null_count = sum(1 for row in self._data if row.get(col) is not None)
# Determine data type from first non-null value
dtype = "object"
for row in self._data:
value = row.get(col)
if value is not None:
if isinstance(value, int):
dtype = "int64"
elif isinstance(value, float):
dtype = "float64"
elif isinstance(value, bool):
dtype = "bool"
elif isinstance(value, str):
dtype = "object"
break
column_info.append({
'column': col,
'non_null': non_null_count,
'dtype': dtype
})
# Calculate approximate memory usage
memory_usage = self._estimate_memory_usage()
# Format output
info_lines = [
f"<class 'tempdataset.core.utils.data_frame.TempDataFrame'>",
f"RangeIndex: {rows} entries, 0 to {rows-1}" if rows > 0 else "RangeIndex: 0 entries",
f"Data columns (total {cols} columns):"
]
# Add column information
info_lines.append(" # Column" + " " * 15 + "Non-Null Count Dtype")
info_lines.append("--- ------" + " " * 15 + "-------------- -----")
for i, col_info in enumerate(column_info):
col_name = col_info['column'][:20] # Truncate long column names
info_lines.append(f" {i:<3} {col_name:<20} {col_info['non_null']} non-null {col_info['dtype']}")
info_lines.append(f"dtypes: {self._get_dtype_counts()}")
info_lines.append(f"memory usage: {memory_usage}")
result = DisplayFormatter("\n".join(info_lines))
# Auto-print in script context
print(result.content)
return result
[docs]
def to_csv(self, filename: str) -> None:
"""
Export to CSV file.
Args:
filename: Path to output CSV file
Raises:
ValidationError: If filename is invalid
CSVWriteError: If CSV writing fails
"""
# Validate input parameter
if not isinstance(filename, str):
raise ValidationError("filename", filename, "string")
if not filename.strip():
raise ValidationError("filename", filename, "non-empty string")
try:
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
if not self._data:
# Write just headers for empty DataFrame
writer = csv.writer(csvfile)
writer.writerow(self._columns)
return
writer = csv.DictWriter(csvfile, fieldnames=self._columns)
writer.writeheader()
writer.writerows(self._data)
except PermissionError as e:
raise CSVWriteError(filename, e)
except OSError as e:
raise CSVWriteError(filename, e)
except Exception as e:
# Catch any other unexpected errors
if isinstance(e, (ValidationError, CSVWriteError)):
raise # Re-raise our custom exceptions
raise CSVWriteError(filename, e)
[docs]
def to_json(self, filename: str) -> None:
"""
Export to JSON file.
Args:
filename: Path to output JSON file
Raises:
ValidationError: If filename is invalid
JSONWriteError: If JSON writing fails
"""
# Validate input parameter
if not isinstance(filename, str):
raise ValidationError("filename", filename, "string")
if not filename.strip():
raise ValidationError("filename", filename, "non-empty string")
try:
with open(filename, 'w', encoding='utf-8') as jsonfile:
json.dump(self._data, jsonfile, indent=2, default=str)
except PermissionError as e:
raise JSONWriteError(filename, e)
except OSError as e:
raise JSONWriteError(filename, e)
except (TypeError, ValueError) as e:
raise JSONWriteError(filename, e)
except Exception as e:
# Catch any other unexpected errors
if isinstance(e, (ValidationError, JSONWriteError)):
raise # Re-raise our custom exceptions
raise JSONWriteError(filename, e)
def _format_rows(self, rows: List[Dict[str, Any]]) -> str:
"""
Format rows for display.
Args:
rows: List of row dictionaries to format
Returns:
Formatted string representation
"""
if not rows:
return "Empty DataFrame"
# Calculate column widths
col_widths = {}
for col in self._columns:
# Start with column name width
col_widths[col] = len(col)
# Check data widths
for row in rows:
value_str = str(row.get(col, ''))
col_widths[col] = max(col_widths[col], len(value_str))
# Set minimum and maximum widths
col_widths[col] = max(col_widths[col], 3) # Minimum width
col_widths[col] = min(col_widths[col], 20) # Maximum width for readability
# Create header
header_parts = []
separator_parts = []
for col in self._columns:
width = col_widths[col]
header_parts.append(col.ljust(width))
separator_parts.append('-' * width)
lines = [
' '.join(header_parts),
' '.join(separator_parts)
]
# Add data rows
for i, row in enumerate(rows):
row_parts = []
for col in self._columns:
value = row.get(col, '')
value_str = str(value)
# Truncate if too long
if len(value_str) > col_widths[col]:
value_str = value_str[:col_widths[col]-3] + '...'
row_parts.append(value_str.ljust(col_widths[col]))
lines.append(' '.join(row_parts))
return '\n'.join(lines)
def _estimate_memory_usage(self) -> str:
"""
Estimate memory usage of the DataFrame.
Returns:
Human-readable memory usage string
"""
if not self._data:
return "0 bytes"
# Rough estimation based on Python object sizes
total_bytes = 0
# Base object overhead
total_bytes += sys.getsizeof(self._data)
total_bytes += sys.getsizeof(self._columns)
# Data content
for row in self._data:
total_bytes += sys.getsizeof(row)
for value in row.values():
total_bytes += sys.getsizeof(value)
# Convert to human readable format
if total_bytes < 1024:
return f"{total_bytes} bytes"
elif total_bytes < 1024 * 1024:
return f"{total_bytes / 1024:.1f} KB"
else:
return f"{total_bytes / (1024 * 1024):.1f} MB"
[docs]
def memory_usage(self) -> float:
"""
Get memory usage in megabytes.
Returns:
Memory usage in MB as a float
"""
if not self._data:
return 0.0
# Rough estimation based on Python object sizes
total_bytes = 0
# Base object overhead
total_bytes += sys.getsizeof(self._data)
total_bytes += sys.getsizeof(self._columns)
# Data content
for row in self._data:
total_bytes += sys.getsizeof(row)
for value in row.values():
total_bytes += sys.getsizeof(value)
# Return as MB
return total_bytes / (1024 * 1024)
[docs]
def filter(self, condition_func) -> 'TempDataFrame':
"""
Filter rows based on a condition function.
Args:
condition_func: Function that takes a row dict and returns True/False
Returns:
New TempDataFrame with filtered rows
Raises:
ValidationError: If condition_func is not callable
"""
if not callable(condition_func):
raise ValidationError("condition_func", condition_func, "callable function")
filtered_data = []
for row in self._data:
try:
if condition_func(row):
filtered_data.append(row.copy())
except Exception as e:
# Skip rows that cause errors in the condition function
continue
return TempDataFrame(filtered_data, self._columns.copy())
[docs]
def select(self, columns: List[str]) -> 'TempDataFrame':
"""
Select specific columns from the DataFrame.
Args:
columns: List of column names to select
Returns:
New TempDataFrame with selected columns only
Raises:
ValidationError: If columns parameter is invalid or contains non-existent columns
"""
if not isinstance(columns, list):
raise ValidationError("columns", columns, "list of strings")
if not all(isinstance(col, str) for col in columns):
raise ValidationError("columns", columns, "list of strings")
# Check if all requested columns exist
missing_cols = [col for col in columns if col not in self._columns]
if missing_cols:
raise ValidationError("columns", missing_cols, f"columns that exist in DataFrame. Available columns: {self._columns}")
# Create new data with only selected columns
selected_data = []
for row in self._data:
selected_row = {col: row.get(col) for col in columns}
selected_data.append(selected_row)
return TempDataFrame(selected_data, columns.copy())
[docs]
def to_dict(self) -> List[Dict[str, Any]]:
"""
Convert DataFrame to list of dictionaries.
Returns:
List of dictionaries representing the data
"""
return [row.copy() for row in self._data]
def _get_dtype_counts(self) -> str:
"""
Get counts of different data types.
Returns:
String describing data type distribution
"""
if not self._data:
return "no data"
dtype_counts = {}
for col in self._columns:
# Determine data type from first non-null value
dtype = "object"
for row in self._data:
value = row.get(col)
if value is not None:
if isinstance(value, int):
dtype = "int64"
elif isinstance(value, float):
dtype = "float64"
elif isinstance(value, bool):
dtype = "bool"
elif isinstance(value, str):
dtype = "object"
break
dtype_counts[dtype] = dtype_counts.get(dtype, 0) + 1
# Format as "type(count), type(count), ..."
parts = [f"{dtype}({count})" for dtype, count in sorted(dtype_counts.items())]
return ", ".join(parts)