Risk Management Engine
Five-factor composite risk scoring, ML-powered anomaly detection with an ensemble of Isolation Forest, Markov behavioral models, entropy drift detection, and statistical z-scores—backed by circuit breakers, counterparty profiling, and on-chain insurance primitives.
Composite Scoring
Five weighted factors, one score
Every transaction is evaluated against a composite risk score computed from five orthogonal dimensions. Each factor contributes a weighted percentage to the final score in the range [0, 1].
Combined weight distribution
def compute_composite_risk(tx: Transaction, agent: Agent) -> float:
"""Weighted five-factor risk score in [0, 1]."""
factors = {
"authority_compliance": check_authority_bounds(tx, agent), # 0.30
"circuit_breaker": get_circuit_breaker_score(agent), # 0.25
"behavioral_anomaly": anomaly_score(tx, agent), # 0.20
"counterparty_risk": assess_counterparty(tx.counterparty),# 0.15
"concentration_risk": concentration_score(tx, agent), # 0.10
}
WEIGHTS = {
"authority_compliance": 0.30,
"circuit_breaker": 0.25,
"behavioral_anomaly": 0.20,
"counterparty_risk": 0.15,
"concentration_risk": 0.10,
}
composite = sum(factors[k] * WEIGHTS[k] for k in factors)
return min(max(composite, 0.0), 1.0)Risk Levels
Six thresholds from safe to blocked
The composite risk score maps to six discrete levels. Each level triggers a different protocol response—from silent pass-through to complete transaction blocking.
MINIMAL
<0.1
LOW
0.1–0.3
MODERATE
0.3–0.5
HIGH
0.5–0.7
CRITICAL
0.7–0.9
BLOCKED
≥0.9
class RiskLevel(Enum):
MINIMAL = "minimal" # < 0.1 — silent pass-through
LOW = "low" # 0.1–0.3 — log only
MODERATE = "moderate" # 0.3–0.5 — require additional verification
HIGH = "high" # 0.5–0.7 — hold for manual review
CRITICAL = "critical" # 0.7–0.9 — auto-reject, alert operator
BLOCKED = "blocked" # ≥ 0.9 — hard block, freeze agent
def classify_risk(score: float) -> RiskLevel:
if score < 0.1: return RiskLevel.MINIMAL
if score < 0.3: return RiskLevel.LOW
if score < 0.5: return RiskLevel.MODERATE
if score < 0.7: return RiskLevel.HIGH
if score < 0.9: return RiskLevel.CRITICAL
return RiskLevel.BLOCKEDBehavioral Analysis
Anomaly scoring algorithm
The behavioral anomaly detector computes an additive score from six independent triggers. Each trigger contributes a fixed penalty. The sum is capped at 1.0 and feeds into the composite risk score with a 20% weight.
Extreme statistical outlier in transaction behavior
Sudden increase in transaction volume
Rapid burst of transaction frequency
Moderate statistical deviation from baseline
First-time interaction with unknown agent
Transaction type not seen in recent history
def anomaly_score(tx: Transaction, agent: Agent) -> float:
"""Additive behavioral anomaly score, capped at 1.0."""
score = 0.0
stats = agent.behavioral_stats
# Statistical z-score thresholds
z = abs(tx.value - stats.mean_value) / stats.std_value
if z > 3.0:
score += 0.30 # extreme outlier
elif z > 2.0:
score += 0.15 # moderate deviation
# Counterparty novelty
if tx.counterparty not in agent.known_counterparties:
score += 0.15
# Transaction type novelty
if tx.tx_type not in stats.recent_tx_types:
score += 0.10
# Volume spike detection
if tx.value > 3.0 * stats.daily_avg_volume:
score += 0.25
# Velocity spike detection
if stats.tx_count_last_hour > 3.0 * stats.hourly_avg_count:
score += 0.20
return min(score, 1.0)Circuit Breakers
Three states, automatic recovery
Circuit breakers protect the network from cascading failures. When an agent accumulates 5+ consecutive failures, transactions are blocked. After a 300-second cooldown, the breaker enters a half-open state allowing limited test transactions.
CLOSED
Normal operation
OPEN
All transactions blocked
HALF_OPEN
Testing with limited tx
→ CLOSED
→ OPEN
Configuration
failure_threshold
Consecutive failures before opening
reset_timeout
Cooldown before half-open state
test_tx_required
Successful tests to close
max_test_value
Max value during half-open test
class CircuitBreakerState(Enum):
CLOSED = "closed" # normal operation
OPEN = "open" # all transactions blocked
HALF_OPEN = "half_open" # testing recovery
class CircuitBreaker:
failure_threshold: int = 5
reset_timeout: float = 300.0 # seconds
test_tx_required: int = 3
def record_failure(self):
self.consecutive_failures += 1
if self.consecutive_failures >= self.failure_threshold:
self.state = CircuitBreakerState.OPEN
self.opened_at = now()
def can_execute(self, tx: Transaction) -> bool:
match self.state:
case CLOSED:
return True
case OPEN:
if elapsed(self.opened_at) >= self.reset_timeout:
self.state = HALF_OPEN
self.test_successes = 0
return tx.value <= self.max_test_value
return False
case HALF_OPEN:
return tx.value <= self.max_test_value
def record_success(self):
if self.state == HALF_OPEN:
self.test_successes += 1
if self.test_successes >= self.test_tx_required:
self.state = CLOSED
self.consecutive_failures = 0ML Pipeline
Four-model ensemble, one threshold
The ML anomaly detection layer combines four orthogonal detection models into a weighted ensemble. Each model outputs a score in [0, 1]. The combined threshold of 0.6 triggers investigation.
score = 2^(−E[h(x)] / c(n))
n_trees = 50, max_samples = 128
Detects anomalies by isolating observations via random partitioning. Anomalies require fewer splits.
surprise = −log₂(P(sₜ|sₜ₋₁)) / log₂(n)
n_states = dynamic, window = 100
Models agent behavior as state transitions. High surprise indicates deviation from learned patterns.
KL(P_ref ‖ P_curr)
ref_window = 1000, curr_window = 50
Measures distributional shift between historical reference and current behavioral window via KL-divergence.
|value − μ| / σ / 5.0
rolling_mean, rolling_std
Classical outlier detection normalized to [0,1]. Captures sudden deviations in any numeric metric.
Combined Threshold
Weighted sum of all four model outputs. Transactions exceeding this threshold trigger investigation.
0.6
threshold
class AnomalyDetectionEnsemble:
"""Weighted ensemble of four orthogonal anomaly detectors."""
def __init__(self):
self.models = {
"isolation_forest": (IsolationForest(n_trees=50, max_samples=128), 0.30),
"markov_behavioral": (MarkovBehavioralModel(), 0.25),
"entropy_drift": (EntropyDriftDetector(ref=1000, curr=50), 0.20),
"statistical_zscore": (StatisticalZScore(), 0.25),
}
self.threshold = 0.6
def score(self, features: FeatureVector) -> float:
combined = sum(
model.score(features) * weight
for model, weight in self.models.values()
)
return min(combined, 1.0)
def is_anomalous(self, features: FeatureVector) -> bool:
return self.score(features) >= self.threshold
class IsolationForest:
def score(self, features: FeatureVector) -> float:
avg_path = mean(tree.path_length(features) for tree in self.trees)
return 2 ** (-avg_path / self._c(self.max_samples))
class MarkovBehavioralModel:
def score(self, features: FeatureVector) -> float:
prob = self.transition_matrix[features.prev_state][features.curr_state]
surprise = -log2(max(prob, 1e-10)) / log2(self.n_states)
return min(surprise, 1.0)Counterparty Risk
Nine signals, one profile
Counterparty risk is assessed by evaluating nine independent signals about the transacting agent. A revoked identity results in an instant block (score = 1.0). All other factors are additive.
| Condition | Penalty | Severity |
|---|---|---|
| Reputation < 0.3 | +0.35 | HIGH |
| Reputation < 0.6 | +0.15 | MEDIUM |
| Account age < 24h | +0.25 | HIGH |
| Account age < 1 week | +0.10 | LOW |
| Transaction history < 10 | +0.15 | MEDIUM |
| Dispute rate > 10% | +0.30 | HIGH |
| Value ratio > 10× avg | +0.20 | MEDIUM |
| Delegation depth > 3 | +0.10 | LOW |
| Status: REVOKED | = 1.0 | BLOCKED |
def assess_counterparty(agent: Agent) -> float:
"""Additive counterparty risk score. Revoked = instant 1.0."""
if agent.status == AgentStatus.REVOKED:
return 1.0
score = 0.0
# Reputation signals
if agent.reputation < 0.3:
score += 0.35
elif agent.reputation < 0.6:
score += 0.15
# Account age signals
age = now() - agent.created_at
if age < timedelta(hours=24):
score += 0.25
elif age < timedelta(weeks=1):
score += 0.10
# Transaction history depth
if agent.total_tx_count < 10:
score += 0.15
# Dispute rate
if agent.total_tx_count > 0:
dispute_rate = agent.dispute_count / agent.total_tx_count
if dispute_rate > 0.10:
score += 0.30
# Value anomaly (relative to agent's average)
if agent.avg_tx_value > 0:
if tx.value > 10.0 * agent.avg_tx_value:
score += 0.20
# Delegation depth
if len(agent.delegation_chain) > 3:
score += 0.10
return min(score, 1.0)Insurance Layer
On-chain insurance primitives
The InsurancePool provides protocol-native risk transfer. Premiums are dynamically priced based on reputation and dispute history. High-reputation agents pay less; dispute-prone agents pay more.
premium = base_rate × rep_discount × dispute_mult
base_rate
Pool-configured base premium rate
rep_discount
Reputation-based multiplier
dispute_mult
Dispute history adjustment
rep >= 0.8
High-trust agent
0.5 <= rep < 0.8
Standard agent
rep < 0.5
Low-trust agent
solvency = staked / outstanding_claims
The pool must maintain solvency > 1.0 at all times. New policies are rejected if accepting them would drop the solvency ratio below the safety threshold.
Solvency < 1.5 triggers capital call to stakers
class InsurancePool:
total_staked: u128
total_outstanding_claims: u128
base_premium_rate: float
def calculate_premium(self, agent: Agent, coverage: u128) -> u128:
"""Dynamic premium: base × reputation_discount × dispute_multiplier."""
# Reputation discount
if agent.reputation >= 0.8:
rep_discount = 0.7 # 30% discount for trusted agents
elif agent.reputation < 0.5:
rep_discount = 1.5 # 50% surcharge for risky agents
else:
rep_discount = 1.0
# Dispute history multiplier
if agent.total_tx_count > 0:
dispute_rate = agent.dispute_count / agent.total_tx_count
dispute_mult = 1.0 + (dispute_rate * 5.0) # linear scaling
else:
dispute_mult = 1.2 # unknown history surcharge
premium = coverage * self.base_premium_rate * rep_discount * dispute_mult
return max(premium, self.minimum_premium)
@property
def solvency_ratio(self) -> float:
if self.total_outstanding_claims == 0:
return float('inf')
return self.total_staked / self.total_outstanding_claims
def can_issue_policy(self, coverage: u128) -> bool:
future_claims = self.total_outstanding_claims + coverage
return self.total_staked / future_claims >= 1.5Network Health
Real-time system metrics
The risk engine continuously monitors aggregate network health. These metrics feed back into individual risk assessments, creating a feedback loop that tightens controls when the network is under stress.
Open Breakers
open_circuit_breakers
Agents currently in OPEN or HALF_OPEN state
Avg Risk (1h)
avg_risk_score_1h
Rolling 1-hour average composite risk score
Volume (24h)
total_volume_24h
Aggregate transaction volume over 24 hours
Tx Count (24h)
total_tx_count_24h
Total transactions processed in 24 hours
class NetworkHealthMonitor:
"""Aggregate network health metrics for feedback-driven risk adjustment."""
def snapshot(self) -> NetworkHealth:
agents = self.registry.all_agents()
open_breakers = sum(
1 for a in agents
if a.circuit_breaker.state != CircuitBreakerState.CLOSED
)
recent_tx = self.ledger.transactions_since(now() - timedelta(hours=1))
avg_risk_1h = mean(tx.risk_score for tx in recent_tx) if recent_tx else 0.0
daily_tx = self.ledger.transactions_since(now() - timedelta(hours=24))
total_volume_24h = sum(tx.value for tx in daily_tx)
total_count_24h = len(daily_tx)
return NetworkHealth(
open_circuit_breakers=open_breakers,
avg_risk_score_1h=avg_risk_1h,
total_volume_24h=total_volume_24h,
total_tx_count_24h=total_count_24h,
timestamp=now(),
)
def stress_multiplier(self) -> float:
"""Increase risk sensitivity when the network is under stress."""
health = self.snapshot()
breaker_ratio = health.open_circuit_breakers / max(self.total_agents, 1)
if breaker_ratio > 0.1: # >10% agents tripped
return 1.5
if health.avg_risk_score_1h > 0.5:
return 1.3
return 1.0Related deep dives