"""
Portfolio Management and Risk Analytics Module
Comprehensive portfolio management with advanced risk metrics
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import scipy.stats as stats
from dataclasses import dataclass
[docs]
@dataclass
class RiskMetrics:
"""Risk metrics container"""
var_95: float # Value at Risk (95%)
var_99: float # Value at Risk (99%)
cvar_95: float # Conditional VaR (95%)
cvar_99: float # Conditional VaR (99%)
skewness: float
kurtosis: float
maximum_drawdown: float
calmar_ratio: float
sterling_ratio: float
burke_ratio: float
[docs]
class AdvancedPortfolioAnalytics:
"""Advanced portfolio analytics and risk management"""
[docs]
def __init__(self, returns: np.ndarray, benchmark_returns: Optional[np.ndarray] = None):
self.returns = returns
self.benchmark_returns = benchmark_returns
[docs]
def calculate_var(self, confidence_level: float = 0.05) -> float:
"""Calculate Value at Risk"""
if len(self.returns) == 0:
return 0.0
return np.percentile(self.returns, confidence_level * 100)
[docs]
def calculate_cvar(self, confidence_level: float = 0.05) -> float:
"""Calculate Conditional Value at Risk (Expected Shortfall)"""
var = self.calculate_var(confidence_level)
cvar_returns = self.returns[self.returns <= var]
return np.mean(cvar_returns) if len(cvar_returns) > 0 else 0.0
[docs]
def calculate_maximum_drawdown(self, equity_curve: np.ndarray) -> Tuple[float, int, int]:
"""Calculate maximum drawdown and duration"""
if len(equity_curve) == 0:
return 0.0, 0, 0
peak = np.maximum.accumulate(equity_curve)
drawdown = (equity_curve - peak) / peak
max_dd = np.min(drawdown)
max_dd_idx = np.argmin(drawdown)
# Find the peak before max drawdown
peak_idx = np.argmax(equity_curve[:max_dd_idx+1])
# Find recovery point
recovery_idx = max_dd_idx
for i in range(max_dd_idx, len(equity_curve)):
if equity_curve[i] >= peak[max_dd_idx]:
recovery_idx = i
break
duration = recovery_idx - peak_idx
return max_dd, duration, max_dd_idx
[docs]
def calculate_ulcer_index(self, equity_curve: np.ndarray) -> float:
"""Calculate Ulcer Index - measure of downside risk"""
if len(equity_curve) == 0:
return 0.0
peak = np.maximum.accumulate(equity_curve)
drawdown = (equity_curve - peak) / peak * 100
squared_drawdowns = drawdown ** 2
ulcer_index = np.sqrt(np.mean(squared_drawdowns))
return ulcer_index
[docs]
def calculate_burke_ratio(self, equity_curve: np.ndarray) -> float:
"""Calculate Burke Ratio"""
if len(self.returns) == 0 or len(equity_curve) == 0:
return 0.0
excess_return = np.mean(self.returns) * 252 # Annualized
ulcer_index = self.calculate_ulcer_index(equity_curve)
return excess_return / ulcer_index if ulcer_index != 0 else 0.0
[docs]
def calculate_sterling_ratio(self, equity_curve: np.ndarray) -> float:
"""Calculate Sterling Ratio"""
if len(self.returns) == 0:
return 0.0
annual_return = np.mean(self.returns) * 252
max_dd, _, _ = self.calculate_maximum_drawdown(equity_curve)
# Sterling ratio uses average drawdown, approximated as max_dd * 0.7
avg_drawdown = abs(max_dd) * 0.7
return annual_return / avg_drawdown if avg_drawdown != 0 else 0.0
[docs]
def calculate_tracking_error(self) -> float:
"""Calculate tracking error vs benchmark"""
if self.benchmark_returns is None or len(self.returns) != len(self.benchmark_returns):
return 0.0
excess_returns = self.returns - self.benchmark_returns
return np.std(excess_returns) * np.sqrt(252)
[docs]
def calculate_beta(self) -> float:
"""Calculate Beta vs benchmark"""
if self.benchmark_returns is None or len(self.returns) != len(self.benchmark_returns):
return 1.0
covariance = np.cov(self.returns, self.benchmark_returns)[0, 1]
benchmark_variance = np.var(self.benchmark_returns)
return covariance / benchmark_variance if benchmark_variance != 0 else 1.0
[docs]
def calculate_alpha(self, risk_free_rate: float = 0.0) -> float:
"""Calculate Alpha vs benchmark"""
if self.benchmark_returns is None:
return 0.0
beta = self.calculate_beta()
portfolio_return = np.mean(self.returns) * 252
benchmark_return = np.mean(self.benchmark_returns) * 252
alpha = portfolio_return - (risk_free_rate + beta * (benchmark_return - risk_free_rate))
return alpha
[docs]
def calculate_treynor_ratio(self, risk_free_rate: float = 0.0) -> float:
"""Calculate Treynor Ratio"""
if self.benchmark_returns is None:
return 0.0
beta = self.calculate_beta()
portfolio_return = np.mean(self.returns) * 252
return (portfolio_return - risk_free_rate) / beta if beta != 0 else 0.0
[docs]
def calculate_modigliani_ratio(self, risk_free_rate: float = 0.0) -> float:
"""Calculate Modigliani-Modigliani Ratio"""
if self.benchmark_returns is None:
return 0.0
portfolio_return = np.mean(self.returns) * 252
portfolio_vol = np.std(self.returns) * np.sqrt(252)
benchmark_vol = np.std(self.benchmark_returns) * np.sqrt(252)
if portfolio_vol == 0:
return 0.0
adjusted_return = risk_free_rate + (portfolio_return - risk_free_rate) * (benchmark_vol / portfolio_vol)
benchmark_return = np.mean(self.benchmark_returns) * 252
return adjusted_return - benchmark_return
[docs]
def calculate_omega_ratio(self, threshold: float = 0.0) -> float:
"""Calculate Omega Ratio"""
if len(self.returns) == 0:
return 1.0
excess_returns = self.returns - threshold
gains = excess_returns[excess_returns > 0]
losses = excess_returns[excess_returns < 0]
if len(losses) == 0:
return float('inf')
if len(gains) == 0:
return 0.0
return np.sum(gains) / abs(np.sum(losses))
[docs]
def calculate_kappa_ratio(self, order: int = 3, threshold: float = 0.0) -> float:
"""Calculate Kappa Ratio (generalized downside risk measure)"""
if len(self.returns) == 0:
return 0.0
excess_returns = self.returns - threshold
downside_returns = excess_returns[excess_returns < 0]
if len(downside_returns) == 0:
return float('inf')
lower_partial_moment = np.mean(np.abs(downside_returns) ** order) ** (1/order)
mean_excess_return = np.mean(excess_returns)
return mean_excess_return / lower_partial_moment if lower_partial_moment != 0 else 0.0
[docs]
def calculate_gain_pain_ratio(self) -> float:
"""Calculate Gain-to-Pain Ratio"""
if len(self.returns) == 0:
return 0.0
gains = np.sum(self.returns[self.returns > 0])
losses = abs(np.sum(self.returns[self.returns < 0]))
return gains / losses if losses != 0 else float('inf')
[docs]
def calculate_comprehensive_risk_metrics(self, equity_curve: np.ndarray) -> RiskMetrics:
"""Calculate comprehensive risk metrics"""
var_95 = self.calculate_var(0.05)
var_99 = self.calculate_var(0.01)
cvar_95 = self.calculate_cvar(0.05)
cvar_99 = self.calculate_cvar(0.01)
skewness = stats.skew(self.returns) if len(self.returns) > 0 else 0.0
kurtosis = stats.kurtosis(self.returns) if len(self.returns) > 0 else 0.0
max_dd, _, _ = self.calculate_maximum_drawdown(equity_curve)
# Calmar ratio
annual_return = np.mean(self.returns) * 252 if len(self.returns) > 0 else 0.0
calmar_ratio = annual_return / abs(max_dd) if max_dd != 0 else 0.0
sterling_ratio = self.calculate_sterling_ratio(equity_curve)
burke_ratio = self.calculate_burke_ratio(equity_curve)
return RiskMetrics(
var_95=var_95,
var_99=var_99,
cvar_95=cvar_95,
cvar_99=cvar_99,
skewness=skewness,
kurtosis=kurtosis,
maximum_drawdown=max_dd,
calmar_ratio=calmar_ratio,
sterling_ratio=sterling_ratio,
burke_ratio=burke_ratio
)
[docs]
class PositionSizing:
"""Position sizing and risk management"""
[docs]
@staticmethod
def kelly_criterion(win_rate: float, avg_win: float, avg_loss: float) -> float:
"""Calculate Kelly Criterion optimal position size"""
if avg_loss == 0 or win_rate == 0:
return 0.0
win_loss_ratio = avg_win / abs(avg_loss)
kelly_fraction = (win_rate * win_loss_ratio - (1 - win_rate)) / win_loss_ratio
# Cap Kelly fraction at 25% for safety
return max(0.0, min(kelly_fraction, 0.25))
[docs]
@staticmethod
def fixed_fractional(account_equity: float, risk_per_trade: float, stop_loss_pct: float) -> float:
"""Calculate position size using fixed fractional method"""
if stop_loss_pct == 0:
return 0.0
risk_amount = account_equity * risk_per_trade
position_size = risk_amount / stop_loss_pct
return position_size
[docs]
@staticmethod
def volatility_position_sizing(account_equity: float, target_volatility: float,
asset_volatility: float, correlation_adjustment: float = 1.0) -> float:
"""Calculate position size based on volatility targeting"""
if asset_volatility == 0:
return 0.0
leverage = target_volatility / (asset_volatility * correlation_adjustment)
position_size = account_equity * leverage
return position_size
[docs]
@staticmethod
def risk_parity_weights(covariance_matrix: np.ndarray) -> np.ndarray:
"""Calculate risk parity portfolio weights"""
n_assets = covariance_matrix.shape[0]
# Start with equal weights
weights = np.ones(n_assets) / n_assets
# Iterative algorithm to achieve risk parity
for _ in range(100): # Max iterations
portfolio_vol = np.sqrt(weights.T @ covariance_matrix @ weights)
marginal_contrib = (covariance_matrix @ weights) / portfolio_vol
contrib = weights * marginal_contrib
# Target equal risk contribution
target_contrib = portfolio_vol / n_assets
# Update weights
weights = weights * target_contrib / contrib
weights = weights / np.sum(weights) # Normalize
# Check convergence
if np.max(np.abs(contrib - target_contrib)) < 1e-6:
break
return weights
# Export main classes
__all__ = ['AdvancedPortfolioAnalytics', 'PositionSizing', 'PerformanceAttribution', 'RiskMetrics']