Building a Jurisdiction-Aware Intake Router in Python

Data Subject Request (DSR) pipelines operate under strict temporal and regulatory constraints that leave zero margin for routing ambiguity. The intake layer functions as the primary compliance control plane, where raw consumer submissions are transformed into deterministic, auditable workflows. A jurisdiction-aware intake router in Python must resolve conflicting residency signals, normalize heterogeneous request taxonomies, and enforce strict SLA boundaries before any personally identifiable information touches downstream processing systems. This architecture eliminates manual triage bottlenecks, establishes verifiable audit trails, and ensures that privacy engineering teams can rapidly isolate and remediate routing anomalies without breaching statutory deadlines.

The router resolves each payload through gating, multi-signal jurisdiction resolution, SLA computation, and a three-tier fallback before handing off to fulfillment:

flowchart TD
    A["Intake payload"] --> B{"Schema valid?"}
    B -->|no| R["Reject at edge"]
    B -->|yes| C["Assign correlation ID"]
    C --> D["Resolve jurisdiction from signals"]
    D --> E{"Signal confidence >= 0.60"}
    E -->|no| T["Manual triage - SLA paused"]
    E -->|yes| F["Compute SLA deadline"]
    F --> G{"KMS envelope encryption"}
    G -->|error| K["Encryption DLQ - retry backoff"]
    G -->|ok| H["Route to fulfillment worker"]

1. Deterministic Payload Gating & Idempotency Enforcement

At the core of the DSR Architecture & Intake Routing specification lies a deterministic gating engine that evaluates incoming payloads against a strict schema before committing them to any downstream queue. Python’s dataclasses and typing modules provide the structural foundation, but production systems require runtime validation with strict coercion boundaries. By leveraging Pydantic v2, the router rejects malformed submissions at the edge, preventing silent failures in downstream fulfillment systems.

Every valid intake event must be assigned a cryptographically secure correlation ID immediately upon receipt. This idempotency key propagates through every routing decision, SLA calculation, and compliance audit log. It is critical for debugging duplicate submissions and ensuring that retry logic never inflates SLA clocks or triggers redundant data extraction jobs.

import uuid
import logging
from typing import Optional, Literal
from datetime import datetime, timezone
from pydantic import BaseModel, Field, field_validator, ConfigDict

logger = logging.getLogger("dsr.intake_router")

class DSRIntakePayload(BaseModel):
    model_config = ConfigDict(strict=True, extra="forbid")
    
    request_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    consumer_email: str
    explicit_jurisdiction: Optional[str] = None
    request_type: Literal["access", "deletion", "rectification", "opt_out_sale"]
    raw_ip: Optional[str] = None
    billing_country: Optional[str] = None
    submitted_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

    @field_validator("consumer_email")
    @classmethod
    def validate_email(cls, v: str) -> str:
        if "@" not in v or len(v) > 254:
            raise ValueError("Invalid consumer identifier format")
        return v.lower()

2. Multi-Signal Jurisdiction Resolution

Jurisdiction resolution requires parsing multiple residency indicators: IP geolocation, billing address, account profile metadata, and explicit consumer declarations. When these signals align, routing is trivial. When they diverge, the router must apply a precedence matrix that defaults to the strictest applicable regulation. This deterministic evaluation is detailed in the Jurisdiction Routing Logic specification, but the implementation requires explicit signal weighting and conflict resolution.

from enum import Enum
from dataclasses import dataclass

class Jurisdiction(Enum):
    GDPR = "gdpr"
    CCPA_CPRA = "ccpa_cpra"
    VCDPA = "vcdpa"
    UNKNOWN = "unknown"

@dataclass(frozen=True)
class JurisdictionSignal:
    source: str
    jurisdiction: Jurisdiction
    confidence: float  # 0.0 to 1.0

def resolve_primary_jurisdiction(payload: DSRIntakePayload) -> tuple[Jurisdiction, list[dict]]:
    signals: list[JurisdictionSignal] = []
    
    # 1. Explicit declaration (highest weight)
    if payload.explicit_jurisdiction:
        signals.append(JurisdictionSignal("explicit", Jurisdiction(payload.explicit_jurisdiction), 0.95))
        
    # 2. Billing country (medium weight)
    if payload.billing_country:
        mapping = {"GB": Jurisdiction.GDPR, "DE": Jurisdiction.GDPR, "US-CA": Jurisdiction.CCPA_CPRA}
        signals.append(JurisdictionSignal("billing", mapping.get(payload.billing_country, Jurisdiction.UNKNOWN), 0.70))
        
    # 3. IP geolocation (lower weight, fallback)
    if payload.raw_ip:
        # In production, call internal GeoIP service here
        signals.append(JurisdictionSignal("ip_geo", Jurisdiction.UNKNOWN, 0.50))
        
    # Precedence matrix: strictest applicable regulation wins
    # GDPR > CCPA/CPRA > VCDPA > UNKNOWN
    strictness_order = [Jurisdiction.GDPR, Jurisdiction.CCPA_CPRA, Jurisdiction.VCDPA, Jurisdiction.UNKNOWN]
    
    # Filter valid signals and sort by confidence
    valid_signals = [s for s in signals if s.jurisdiction != Jurisdiction.UNKNOWN]
    if not valid_signals:
        return Jurisdiction.UNKNOWN, []
        
    # Resolve conflicts by picking highest confidence, then strictest fallback
    resolved = max(valid_signals, key=lambda s: (s.confidence, -strictness_order.index(s.jurisdiction)))
    
    audit_trail = [{"source": s.source, "jurisdiction": s.jurisdiction.value, "weight": s.confidence} for s in signals]
    return resolved.jurisdiction, audit_trail

3. SLA Calculation & Statutory Tolling

Once jurisdiction is resolved, the router must compute the statutory response deadline. For example, a consumer with a California IP address but a UK billing profile triggers a dual-scope evaluation. The router calculates both the 30-day GDPR response window and the 45-day CCPA/CPRA window, then applies the shorter deadline while flagging the payload for compliance officer review.

This deterministic SLA mapping relies on Python’s zoneinfo and datetime modules to account for regional holidays, business-day calculations, and statutory tolling provisions. The implementation must handle extensions (e.g., CCPA’s 45+45 day extension with consumer notice, GDPR’s 2-month complexity extension) without breaking the initial SLA clock.

from datetime import timedelta
from zoneinfo import ZoneInfo

def calculate_sla_deadline(jurisdiction: Jurisdiction, submitted_at: datetime, tolling_days: int = 0) -> datetime:
    base_days = {
        Jurisdiction.GDPR: 30,
        Jurisdiction.CCPA_CPRA: 45,
        Jurisdiction.VCDPA: 45
    }.get(jurisdiction, 45)
    
    # Business day arithmetic (simplified; production should use `holidays` or `workalendar`)
    current = submitted_at.astimezone(ZoneInfo("UTC"))
    business_days_elapsed = 0
    while business_days_elapsed < base_days + tolling_days:
        current += timedelta(days=1)
        # Skip weekends
        if current.weekday() < 5:
            business_days_elapsed += 1
            
    return current

Note: For production-grade business-day arithmetic that accounts for jurisdiction-specific public holidays, integrate a maintained library like python-dateutil or reference the official Python zoneinfo documentation for timezone-aware scheduling.

4. Secure PII Envelope & Taxonomy Normalization

Secure PII handling begins at the intake boundary. The router never stores raw consumer identifiers in plaintext. Instead, it applies envelope encryption using a hardware-backed KMS, routing only the encrypted payload reference and metadata to the processing queue. Taxonomy normalization occurs concurrently, mapping heterogeneous consumer inputs to a canonical internal schema.

import os
import json
import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

class SecureIntakeEnvelope:
    def __init__(self, kms_client):
        self.kms = kms_client
        
    def wrap_payload(self, payload: DSRIntakePayload, jurisdiction: Jurisdiction, deadline: datetime) -> dict:
        # 1. Serialize raw PII
        raw_pii = json.dumps({"email": payload.consumer_email, "request_id": payload.request_id}).encode()
        
        # 2. Generate DEK and encrypt (envelope pattern)
        dek = AESGCM.generate_key(bit_length=256)
        aesgcm = AESGCM(dek)
        nonce = os.urandom(12)
        ciphertext = aesgcm.encrypt(nonce, raw_pii, None)
        
        # 3. Encrypt DEK with KMS
        encrypted_dek = self.kms.encrypt(KeyId="alias/dsr-intake-key", Plaintext=dek)
        
        # 4. Construct routing envelope (NO raw PII)
        return {
            "correlation_id": payload.request_id,
            "jurisdiction": jurisdiction.value,
            "sla_deadline": deadline.isoformat(),
            "request_type": payload.request_type,
            "encrypted_dek_blob": encrypted_dek["CiphertextBlob"],
            "ciphertext_b64": base64.b64encode(nonce + ciphertext).decode(),
            "kms_key_id": encrypted_dek["KeyId"]
        }

For cryptographic implementation standards, align with NIST SP 800-57 Part 1 Rev. 5 guidelines on key management and envelope encryption practices.

5. Fallback Routing & Escalation Workflows

No intake system operates in a vacuum. Network partitions, KMS throttling, or ambiguous jurisdictional signals require deterministic fallback routing. The router implements a three-tier escalation matrix:

  1. Circuit-Breaker Fallback: If the KMS or GeoIP resolver exceeds timeout thresholds, the payload is routed to a regional dead-letter queue (DLQ) with a PENDING_ENCRYPTION state. A background worker retries with exponential backoff.
  2. Jurisdictional Ambiguity: When signal confidence falls below 0.60 or conflicts cannot be resolved via the precedence matrix, the router defaults to Jurisdiction.UNKNOWN and routes to a MANUAL_TRIAGE queue. The SLA clock is paused until a compliance officer assigns a definitive jurisdiction.
  3. Taxonomy Mismatch: If request_type normalization fails (e.g., consumer submits “remove my data” which maps ambiguously between deletion and opt-out), the payload is flagged with a REQUIRES_CLARIFICATION status. Automated email templates are dispatched to the consumer, and the intake event is parked in a CLARIFICATION_HOLD state.
class FallbackRouter:
    def route_with_fallback(self, envelope: dict, confidence: float) -> str:
        if confidence < 0.60:
            return self._send_to_queue("MANUAL_TRIAGE", envelope, status="AMBIGUOUS_JURISDICTION")
        if envelope.get("kms_error"):
            return self._send_to_queue("ENCRYPTION_DLQ", envelope, status="KMS_FAILURE")
        return self._send_to_queue("FULFILLMENT_WORKER", envelope, status="READY")

6. Observability & Debugging Routing Anomalies

The routing decision is serialized into an immutable JSON audit record, capturing the exact signal weights, jurisdictional overrides, and computed deadline. Privacy engineering teams rely on this structured audit trail to rapidly isolate routing anomalies. Each log entry must include:

  • correlation_id
  • signal_matrix_snapshot
  • jurisdiction_override_reason
  • sla_computed_at
  • routing_destination_queue

When debugging duplicate submissions or SLA drift, engineers query the audit store using the correlation ID. Idempotency checks at the ingress gateway prevent double-processing, while structured logging ensures that every state transition is traceable across distributed queues. By enforcing strict schema validation, deterministic precedence matrices, and envelope encryption at the boundary, the jurisdiction-aware intake router transforms regulatory complexity into a predictable, auditable engineering workflow.