Deep Dive

Risk Management Engine

Five-factor composite risk scoring, ML-powered anomaly detection with an ensemble of Isolation Forest, Markov behavioral models, entropy drift detection, and statistical z-scores—backed by circuit breakers, counterparty profiling, and on-chain insurance primitives.

5-Factor CompositeML EnsembleCircuit BreakersInsurance Primitives

Composite Scoring

Five weighted factors, one score

Every transaction is evaluated against a composite risk score computed from five orthogonal dimensions. Each factor contributes a weighted percentage to the final score in the range [0, 1].

Authority Bounds Compliance30%
Circuit Breaker Status25%
Behavioral Anomaly Score20%
Counterparty Risk15%
Concentration Risk10%

Combined weight distribution

30%
25%
20%
15%
10%
Authority Bounds Compliance
Circuit Breaker Status
Behavioral Anomaly Score
Counterparty Risk
Concentration Risk
risk_scoring.py
def compute_composite_risk(tx: Transaction, agent: Agent) -> float:
    """Weighted five-factor risk score in [0, 1]."""
    factors = {
        "authority_compliance": check_authority_bounds(tx, agent),   # 0.30
        "circuit_breaker":     get_circuit_breaker_score(agent),     # 0.25
        "behavioral_anomaly":  anomaly_score(tx, agent),            # 0.20
        "counterparty_risk":   assess_counterparty(tx.counterparty),# 0.15
        "concentration_risk":  concentration_score(tx, agent),      # 0.10
    }

    WEIGHTS = {
        "authority_compliance": 0.30,
        "circuit_breaker":     0.25,
        "behavioral_anomaly":  0.20,
        "counterparty_risk":   0.15,
        "concentration_risk":  0.10,
    }

    composite = sum(factors[k] * WEIGHTS[k] for k in factors)
    return min(max(composite, 0.0), 1.0)

Risk Levels

Six thresholds from safe to blocked

The composite risk score maps to six discrete levels. Each level triggers a different protocol response—from silent pass-through to complete transaction blocking.

0.01.0
MINIMAL
LOW
MODERATE
HIGH
CRITICAL
BLOCKED

MINIMAL

<0.1

LOW

0.1–0.3

MODERATE

0.3–0.5

HIGH

0.5–0.7

CRITICAL

0.7–0.9

BLOCKED

≥0.9

risk_levels.py
class RiskLevel(Enum):
    MINIMAL  = "minimal"    # < 0.1  — silent pass-through
    LOW      = "low"        # 0.1–0.3 — log only
    MODERATE = "moderate"   # 0.3–0.5 — require additional verification
    HIGH     = "high"       # 0.5–0.7 — hold for manual review
    CRITICAL = "critical"   # 0.7–0.9 — auto-reject, alert operator
    BLOCKED  = "blocked"    # ≥ 0.9  — hard block, freeze agent

def classify_risk(score: float) -> RiskLevel:
    if score < 0.1: return RiskLevel.MINIMAL
    if score < 0.3: return RiskLevel.LOW
    if score < 0.5: return RiskLevel.MODERATE
    if score < 0.7: return RiskLevel.HIGH
    if score < 0.9: return RiskLevel.CRITICAL
    return RiskLevel.BLOCKED

Behavioral Analysis

Anomaly scoring algorithm

The behavioral anomaly detector computes an additive score from six independent triggers. Each trigger contributes a fixed penalty. The sum is capped at 1.0 and feeds into the composite risk score with a 20% weight.

z-score > 3σ+0.30

Extreme statistical outlier in transaction behavior

Volume spike (3× daily avg)+0.25

Sudden increase in transaction volume

Velocity spike (3× daily count)+0.20

Rapid burst of transaction frequency

z-score > 2σ+0.15

Moderate statistical deviation from baseline

New counterparty+0.15

First-time interaction with unknown agent

Unusual tx type+0.10

Transaction type not seen in recent history

anomaly_score.py
def anomaly_score(tx: Transaction, agent: Agent) -> float:
    """Additive behavioral anomaly score, capped at 1.0."""
    score = 0.0
    stats = agent.behavioral_stats

    # Statistical z-score thresholds
    z = abs(tx.value - stats.mean_value) / stats.std_value
    if z > 3.0:
        score += 0.30   # extreme outlier
    elif z > 2.0:
        score += 0.15   # moderate deviation

    # Counterparty novelty
    if tx.counterparty not in agent.known_counterparties:
        score += 0.15

    # Transaction type novelty
    if tx.tx_type not in stats.recent_tx_types:
        score += 0.10

    # Volume spike detection
    if tx.value > 3.0 * stats.daily_avg_volume:
        score += 0.25

    # Velocity spike detection
    if stats.tx_count_last_hour > 3.0 * stats.hourly_avg_count:
        score += 0.20

    return min(score, 1.0)

Circuit Breakers

Three states, automatic recovery

Circuit breakers protect the network from cascading failures. When an agent accumulates 5+ consecutive failures, transactions are blocked. After a 300-second cooldown, the breaker enters a half-open state allowing limited test transactions.

CLOSED

Normal operation

failures >= 5

OPEN

All transactions blocked

timeout 300s

HALF_OPEN

Testing with limited tx

3 test tx success

→ CLOSED

any failure

→ OPEN

Configuration

failure_threshold

Consecutive failures before opening

5

reset_timeout

Cooldown before half-open state

300s

test_tx_required

Successful tests to close

3

max_test_value

Max value during half-open test

100
circuit_breaker.py
class CircuitBreakerState(Enum):
    CLOSED    = "closed"     # normal operation
    OPEN      = "open"       # all transactions blocked
    HALF_OPEN = "half_open"  # testing recovery

class CircuitBreaker:
    failure_threshold: int = 5
    reset_timeout: float = 300.0  # seconds
    test_tx_required: int = 3

    def record_failure(self):
        self.consecutive_failures += 1
        if self.consecutive_failures >= self.failure_threshold:
            self.state = CircuitBreakerState.OPEN
            self.opened_at = now()

    def can_execute(self, tx: Transaction) -> bool:
        match self.state:
            case CLOSED:
                return True
            case OPEN:
                if elapsed(self.opened_at) >= self.reset_timeout:
                    self.state = HALF_OPEN
                    self.test_successes = 0
                    return tx.value <= self.max_test_value
                return False
            case HALF_OPEN:
                return tx.value <= self.max_test_value

    def record_success(self):
        if self.state == HALF_OPEN:
            self.test_successes += 1
            if self.test_successes >= self.test_tx_required:
                self.state = CLOSED
                self.consecutive_failures = 0

ML Pipeline

Four-model ensemble, one threshold

The ML anomaly detection layer combines four orthogonal detection models into a weighted ensemble. Each model outputs a score in [0, 1]. The combined threshold of 0.6 triggers investigation.

Isolation Forest
w=0.30

score = 2^(−E[h(x)] / c(n))

n_trees = 50, max_samples = 128

Detects anomalies by isolating observations via random partitioning. Anomalies require fewer splits.

Markov Behavioral Model
w=0.25

surprise = −log₂(P(sₜ|sₜ₋₁)) / log₂(n)

n_states = dynamic, window = 100

Models agent behavior as state transitions. High surprise indicates deviation from learned patterns.

Entropy Drift Detector
w=0.20

KL(P_ref ‖ P_curr)

ref_window = 1000, curr_window = 50

Measures distributional shift between historical reference and current behavioral window via KL-divergence.

Statistical z-score
w=0.25

|value − μ| / σ / 5.0

rolling_mean, rolling_std

Classical outlier detection normalized to [0,1]. Captures sudden deviations in any numeric metric.

Combined Threshold

Weighted sum of all four model outputs. Transactions exceeding this threshold trigger investigation.

0.6

threshold

ml_ensemble.py
class AnomalyDetectionEnsemble:
    """Weighted ensemble of four orthogonal anomaly detectors."""

    def __init__(self):
        self.models = {
            "isolation_forest": (IsolationForest(n_trees=50, max_samples=128), 0.30),
            "markov_behavioral": (MarkovBehavioralModel(), 0.25),
            "entropy_drift":     (EntropyDriftDetector(ref=1000, curr=50), 0.20),
            "statistical_zscore": (StatisticalZScore(), 0.25),
        }
        self.threshold = 0.6

    def score(self, features: FeatureVector) -> float:
        combined = sum(
            model.score(features) * weight
            for model, weight in self.models.values()
        )
        return min(combined, 1.0)

    def is_anomalous(self, features: FeatureVector) -> bool:
        return self.score(features) >= self.threshold

class IsolationForest:
    def score(self, features: FeatureVector) -> float:
        avg_path = mean(tree.path_length(features) for tree in self.trees)
        return 2 ** (-avg_path / self._c(self.max_samples))

class MarkovBehavioralModel:
    def score(self, features: FeatureVector) -> float:
        prob = self.transition_matrix[features.prev_state][features.curr_state]
        surprise = -log2(max(prob, 1e-10)) / log2(self.n_states)
        return min(surprise, 1.0)

Counterparty Risk

Nine signals, one profile

Counterparty risk is assessed by evaluating nine independent signals about the transacting agent. A revoked identity results in an instant block (score = 1.0). All other factors are additive.

ConditionPenaltySeverity
Reputation < 0.3+0.35HIGH
Reputation < 0.6+0.15MEDIUM
Account age < 24h+0.25HIGH
Account age < 1 week+0.10LOW
Transaction history < 10+0.15MEDIUM
Dispute rate > 10%+0.30HIGH
Value ratio > 10× avg+0.20MEDIUM
Delegation depth > 3+0.10LOW
Status: REVOKED= 1.0BLOCKED
counterparty_risk.py
def assess_counterparty(agent: Agent) -> float:
    """Additive counterparty risk score. Revoked = instant 1.0."""
    if agent.status == AgentStatus.REVOKED:
        return 1.0

    score = 0.0

    # Reputation signals
    if agent.reputation < 0.3:
        score += 0.35
    elif agent.reputation < 0.6:
        score += 0.15

    # Account age signals
    age = now() - agent.created_at
    if age < timedelta(hours=24):
        score += 0.25
    elif age < timedelta(weeks=1):
        score += 0.10

    # Transaction history depth
    if agent.total_tx_count < 10:
        score += 0.15

    # Dispute rate
    if agent.total_tx_count > 0:
        dispute_rate = agent.dispute_count / agent.total_tx_count
        if dispute_rate > 0.10:
            score += 0.30

    # Value anomaly (relative to agent's average)
    if agent.avg_tx_value > 0:
        if tx.value > 10.0 * agent.avg_tx_value:
            score += 0.20

    # Delegation depth
    if len(agent.delegation_chain) > 3:
        score += 0.10

    return min(score, 1.0)

Insurance Layer

On-chain insurance primitives

The InsurancePool provides protocol-native risk transfer. Premiums are dynamically priced based on reputation and dispute history. High-reputation agents pay less; dispute-prone agents pay more.

Premium Calculation

premium = base_rate × rep_discount × dispute_mult

base_rate

Pool-configured base premium rate

rep_discount

Reputation-based multiplier

dispute_mult

Dispute history adjustment

Reputation Discount

rep >= 0.8

High-trust agent

0.7×

0.5 <= rep < 0.8

Standard agent

1.0×

rep < 0.5

Low-trust agent

1.5×
Solvency Ratio

solvency = staked / outstanding_claims

The pool must maintain solvency > 1.0 at all times. New policies are rejected if accepting them would drop the solvency ratio below the safety threshold.

Solvency < 1.5 triggers capital call to stakers

insurance_pool.py
class InsurancePool:
    total_staked: u128
    total_outstanding_claims: u128
    base_premium_rate: float

    def calculate_premium(self, agent: Agent, coverage: u128) -> u128:
        """Dynamic premium: base × reputation_discount × dispute_multiplier."""
        # Reputation discount
        if agent.reputation >= 0.8:
            rep_discount = 0.7   # 30% discount for trusted agents
        elif agent.reputation < 0.5:
            rep_discount = 1.5   # 50% surcharge for risky agents
        else:
            rep_discount = 1.0

        # Dispute history multiplier
        if agent.total_tx_count > 0:
            dispute_rate = agent.dispute_count / agent.total_tx_count
            dispute_mult = 1.0 + (dispute_rate * 5.0)  # linear scaling
        else:
            dispute_mult = 1.2   # unknown history surcharge

        premium = coverage * self.base_premium_rate * rep_discount * dispute_mult
        return max(premium, self.minimum_premium)

    @property
    def solvency_ratio(self) -> float:
        if self.total_outstanding_claims == 0:
            return float('inf')
        return self.total_staked / self.total_outstanding_claims

    def can_issue_policy(self, coverage: u128) -> bool:
        future_claims = self.total_outstanding_claims + coverage
        return self.total_staked / future_claims >= 1.5

Network Health

Real-time system metrics

The risk engine continuously monitors aggregate network health. These metrics feed back into individual risk assessments, creating a feedback loop that tightens controls when the network is under stress.

Open Breakers

open_circuit_breakers

Agents currently in OPEN or HALF_OPEN state

Avg Risk (1h)

avg_risk_score_1h

Rolling 1-hour average composite risk score

Volume (24h)

total_volume_24h

Aggregate transaction volume over 24 hours

Tx Count (24h)

total_tx_count_24h

Total transactions processed in 24 hours

network_health.py
class NetworkHealthMonitor:
    """Aggregate network health metrics for feedback-driven risk adjustment."""

    def snapshot(self) -> NetworkHealth:
        agents = self.registry.all_agents()

        open_breakers = sum(
            1 for a in agents
            if a.circuit_breaker.state != CircuitBreakerState.CLOSED
        )

        recent_tx = self.ledger.transactions_since(now() - timedelta(hours=1))
        avg_risk_1h = mean(tx.risk_score for tx in recent_tx) if recent_tx else 0.0

        daily_tx = self.ledger.transactions_since(now() - timedelta(hours=24))
        total_volume_24h = sum(tx.value for tx in daily_tx)
        total_count_24h = len(daily_tx)

        return NetworkHealth(
            open_circuit_breakers=open_breakers,
            avg_risk_score_1h=avg_risk_1h,
            total_volume_24h=total_volume_24h,
            total_tx_count_24h=total_count_24h,
            timestamp=now(),
        )

    def stress_multiplier(self) -> float:
        """Increase risk sensitivity when the network is under stress."""
        health = self.snapshot()
        breaker_ratio = health.open_circuit_breakers / max(self.total_agents, 1)
        if breaker_ratio > 0.1:      # >10% agents tripped
            return 1.5
        if health.avg_risk_score_1h > 0.5:
            return 1.3
        return 1.0