Building a Jurisdiction-Aware Intake Router in Python
Data Subject Request (DSR) pipelines operate under strict temporal and regulatory constraints that leave zero margin for routing ambiguity. The intake layer functions as the primary compliance control plane, where raw consumer submissions are transformed into deterministic, auditable workflows. A jurisdiction-aware intake router in Python must resolve conflicting residency signals, normalize heterogeneous request taxonomies, and enforce strict SLA boundaries before any personally identifiable information touches downstream processing systems. This architecture eliminates manual triage bottlenecks, establishes verifiable audit trails, and ensures that privacy engineering teams can rapidly isolate and remediate routing anomalies without breaching statutory deadlines.
The router resolves each payload through gating, multi-signal jurisdiction resolution, SLA computation, and a three-tier fallback before handing off to fulfillment:
flowchart TD
A["Intake payload"] --> B{"Schema valid?"}
B -->|no| R["Reject at edge"]
B -->|yes| C["Assign correlation ID"]
C --> D["Resolve jurisdiction from signals"]
D --> E{"Signal confidence >= 0.60"}
E -->|no| T["Manual triage - SLA paused"]
E -->|yes| F["Compute SLA deadline"]
F --> G{"KMS envelope encryption"}
G -->|error| K["Encryption DLQ - retry backoff"]
G -->|ok| H["Route to fulfillment worker"]
1. Deterministic Payload Gating & Idempotency Enforcement
At the core of the DSR Architecture & Intake Routing specification lies a deterministic gating engine that evaluates incoming payloads against a strict schema before committing them to any downstream queue. Python’s dataclasses and typing modules provide the structural foundation, but production systems require runtime validation with strict coercion boundaries. By leveraging Pydantic v2, the router rejects malformed submissions at the edge, preventing silent failures in downstream fulfillment systems.
Every valid intake event must be assigned a cryptographically secure correlation ID immediately upon receipt. This idempotency key propagates through every routing decision, SLA calculation, and compliance audit log. It is critical for debugging duplicate submissions and ensuring that retry logic never inflates SLA clocks or triggers redundant data extraction jobs.
import uuid
import logging
from typing import Optional, Literal
from datetime import datetime, timezone
from pydantic import BaseModel, Field, field_validator, ConfigDict
logger = logging.getLogger("dsr.intake_router")
class DSRIntakePayload(BaseModel):
model_config = ConfigDict(strict=True, extra="forbid")
request_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
consumer_email: str
explicit_jurisdiction: Optional[str] = None
request_type: Literal["access", "deletion", "rectification", "opt_out_sale"]
raw_ip: Optional[str] = None
billing_country: Optional[str] = None
submitted_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
@field_validator("consumer_email")
@classmethod
def validate_email(cls, v: str) -> str:
if "@" not in v or len(v) > 254:
raise ValueError("Invalid consumer identifier format")
return v.lower()
2. Multi-Signal Jurisdiction Resolution
Jurisdiction resolution requires parsing multiple residency indicators: IP geolocation, billing address, account profile metadata, and explicit consumer declarations. When these signals align, routing is trivial. When they diverge, the router must apply a precedence matrix that defaults to the strictest applicable regulation. This deterministic evaluation is detailed in the Jurisdiction Routing Logic specification, but the implementation requires explicit signal weighting and conflict resolution.
from enum import Enum
from dataclasses import dataclass
class Jurisdiction(Enum):
GDPR = "gdpr"
CCPA_CPRA = "ccpa_cpra"
VCDPA = "vcdpa"
UNKNOWN = "unknown"
@dataclass(frozen=True)
class JurisdictionSignal:
source: str
jurisdiction: Jurisdiction
confidence: float # 0.0 to 1.0
def resolve_primary_jurisdiction(payload: DSRIntakePayload) -> tuple[Jurisdiction, list[dict]]:
signals: list[JurisdictionSignal] = []
# 1. Explicit declaration (highest weight)
if payload.explicit_jurisdiction:
signals.append(JurisdictionSignal("explicit", Jurisdiction(payload.explicit_jurisdiction), 0.95))
# 2. Billing country (medium weight)
if payload.billing_country:
mapping = {"GB": Jurisdiction.GDPR, "DE": Jurisdiction.GDPR, "US-CA": Jurisdiction.CCPA_CPRA}
signals.append(JurisdictionSignal("billing", mapping.get(payload.billing_country, Jurisdiction.UNKNOWN), 0.70))
# 3. IP geolocation (lower weight, fallback)
if payload.raw_ip:
# In production, call internal GeoIP service here
signals.append(JurisdictionSignal("ip_geo", Jurisdiction.UNKNOWN, 0.50))
# Precedence matrix: strictest applicable regulation wins
# GDPR > CCPA/CPRA > VCDPA > UNKNOWN
strictness_order = [Jurisdiction.GDPR, Jurisdiction.CCPA_CPRA, Jurisdiction.VCDPA, Jurisdiction.UNKNOWN]
# Filter valid signals and sort by confidence
valid_signals = [s for s in signals if s.jurisdiction != Jurisdiction.UNKNOWN]
if not valid_signals:
return Jurisdiction.UNKNOWN, []
# Resolve conflicts by picking highest confidence, then strictest fallback
resolved = max(valid_signals, key=lambda s: (s.confidence, -strictness_order.index(s.jurisdiction)))
audit_trail = [{"source": s.source, "jurisdiction": s.jurisdiction.value, "weight": s.confidence} for s in signals]
return resolved.jurisdiction, audit_trail
3. SLA Calculation & Statutory Tolling
Once jurisdiction is resolved, the router must compute the statutory response deadline. For example, a consumer with a California IP address but a UK billing profile triggers a dual-scope evaluation. The router calculates both the 30-day GDPR response window and the 45-day CCPA/CPRA window, then applies the shorter deadline while flagging the payload for compliance officer review.
This deterministic SLA mapping relies on Python’s zoneinfo and datetime modules to account for regional holidays, business-day calculations, and statutory tolling provisions. The implementation must handle extensions (e.g., CCPA’s 45+45 day extension with consumer notice, GDPR’s 2-month complexity extension) without breaking the initial SLA clock.
from datetime import timedelta
from zoneinfo import ZoneInfo
def calculate_sla_deadline(jurisdiction: Jurisdiction, submitted_at: datetime, tolling_days: int = 0) -> datetime:
base_days = {
Jurisdiction.GDPR: 30,
Jurisdiction.CCPA_CPRA: 45,
Jurisdiction.VCDPA: 45
}.get(jurisdiction, 45)
# Business day arithmetic (simplified; production should use `holidays` or `workalendar`)
current = submitted_at.astimezone(ZoneInfo("UTC"))
business_days_elapsed = 0
while business_days_elapsed < base_days + tolling_days:
current += timedelta(days=1)
# Skip weekends
if current.weekday() < 5:
business_days_elapsed += 1
return current
Note: For production-grade business-day arithmetic that accounts for jurisdiction-specific public holidays, integrate a maintained library like python-dateutil or reference the official Python zoneinfo documentation for timezone-aware scheduling.
4. Secure PII Envelope & Taxonomy Normalization
Secure PII handling begins at the intake boundary. The router never stores raw consumer identifiers in plaintext. Instead, it applies envelope encryption using a hardware-backed KMS, routing only the encrypted payload reference and metadata to the processing queue. Taxonomy normalization occurs concurrently, mapping heterogeneous consumer inputs to a canonical internal schema.
import os
import json
import base64
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
class SecureIntakeEnvelope:
def __init__(self, kms_client):
self.kms = kms_client
def wrap_payload(self, payload: DSRIntakePayload, jurisdiction: Jurisdiction, deadline: datetime) -> dict:
# 1. Serialize raw PII
raw_pii = json.dumps({"email": payload.consumer_email, "request_id": payload.request_id}).encode()
# 2. Generate DEK and encrypt (envelope pattern)
dek = AESGCM.generate_key(bit_length=256)
aesgcm = AESGCM(dek)
nonce = os.urandom(12)
ciphertext = aesgcm.encrypt(nonce, raw_pii, None)
# 3. Encrypt DEK with KMS
encrypted_dek = self.kms.encrypt(KeyId="alias/dsr-intake-key", Plaintext=dek)
# 4. Construct routing envelope (NO raw PII)
return {
"correlation_id": payload.request_id,
"jurisdiction": jurisdiction.value,
"sla_deadline": deadline.isoformat(),
"request_type": payload.request_type,
"encrypted_dek_blob": encrypted_dek["CiphertextBlob"],
"ciphertext_b64": base64.b64encode(nonce + ciphertext).decode(),
"kms_key_id": encrypted_dek["KeyId"]
}
For cryptographic implementation standards, align with NIST SP 800-57 Part 1 Rev. 5 guidelines on key management and envelope encryption practices.
5. Fallback Routing & Escalation Workflows
No intake system operates in a vacuum. Network partitions, KMS throttling, or ambiguous jurisdictional signals require deterministic fallback routing. The router implements a three-tier escalation matrix:
- Circuit-Breaker Fallback: If the KMS or GeoIP resolver exceeds timeout thresholds, the payload is routed to a regional dead-letter queue (DLQ) with a
PENDING_ENCRYPTIONstate. A background worker retries with exponential backoff. - Jurisdictional Ambiguity: When signal confidence falls below
0.60or conflicts cannot be resolved via the precedence matrix, the router defaults toJurisdiction.UNKNOWNand routes to aMANUAL_TRIAGEqueue. The SLA clock is paused until a compliance officer assigns a definitive jurisdiction. - Taxonomy Mismatch: If
request_typenormalization fails (e.g., consumer submits “remove my data” which maps ambiguously between deletion and opt-out), the payload is flagged with aREQUIRES_CLARIFICATIONstatus. Automated email templates are dispatched to the consumer, and the intake event is parked in aCLARIFICATION_HOLDstate.
class FallbackRouter:
def route_with_fallback(self, envelope: dict, confidence: float) -> str:
if confidence < 0.60:
return self._send_to_queue("MANUAL_TRIAGE", envelope, status="AMBIGUOUS_JURISDICTION")
if envelope.get("kms_error"):
return self._send_to_queue("ENCRYPTION_DLQ", envelope, status="KMS_FAILURE")
return self._send_to_queue("FULFILLMENT_WORKER", envelope, status="READY")
6. Observability & Debugging Routing Anomalies
The routing decision is serialized into an immutable JSON audit record, capturing the exact signal weights, jurisdictional overrides, and computed deadline. Privacy engineering teams rely on this structured audit trail to rapidly isolate routing anomalies. Each log entry must include:
correlation_idsignal_matrix_snapshotjurisdiction_override_reasonsla_computed_atrouting_destination_queue
When debugging duplicate submissions or SLA drift, engineers query the audit store using the correlation ID. Idempotency checks at the ingress gateway prevent double-processing, while structured logging ensures that every state transition is traceable across distributed queues. By enforcing strict schema validation, deterministic precedence matrices, and envelope encryption at the boundary, the jurisdiction-aware intake router transforms regulatory complexity into a predictable, auditable engineering workflow.