OpenSPP as DCI Client
Contents
OpenSPP as DCI Client#
This guide is for developers implementing OpenSPP as a DCI client to consume data from external registries.
Overview#
When acting as a DCI client, OpenSPP queries external DCI-compliant systems to:
Import birth/death records from national CRVS
Check enrollment status in other programs (IBR) to prevent duplication
Query disability status from national Disability Registry (DR)
Perform federated beneficiary lookups across multiple social registries
Architecture#
sequenceDiagram
participant OpenSPP as OpenSPP DCI Client
participant External as External Registry
participant Registry as res.partner
OpenSPP->>External: POST /oauth2/client/token
External-->>OpenSPP: access_token
OpenSPP->>External: POST /registry/sync/search<br/>(Authorization: Bearer token)
External-->>OpenSPP: DCI Search Response
OpenSPP->>OpenSPP: Map DCI Person → partner vals
OpenSPP->>Registry: create(vals) or write(vals)
Registry-->>OpenSPP: partner_id
Module Structure#
Client Modules#
Module |
Purpose |
|---|---|
|
Base client with OAuth, signing, and search |
|
CRVS-specific client for birth/death imports |
|
IBR client for enrollment duplication checks |
|
Disability Registry client for PWD queries |
|
DCI data integration with eligibility CEL expressions |
Key Components#
spp_dci_client/
├── models/
│ ├── dci_data_source.py # External registry connection config
│ ├── dci_signing_key.py # Client signing keys
│ └── dci_import_log.py # Audit trail of imports
├── services/
│ ├── dci_client.py # Generic DCI client
│ ├── auth_service.py # OAuth token management
│ └── mapper.py # DCI → Odoo data mapping
└── wizards/
└── fetch_wizard.py # UI for manual imports
Base DCI Client#
Data Source Configuration#
# spp_dci_client/models/dci_data_source.py
from odoo import models, fields
class DCIDataSource(models.Model):
_name = 'spp.dci.data.source'
_description = 'DCI External Registry Connection'
name = fields.Char(required=True)
registry_type = fields.Selection([
('crvs', 'CRVS - Civil Registration'),
('social', 'Social Registry'),
('ibr', 'Integrated Beneficiary Registry'),
('fr', 'Farmer Registry'),
('dr', 'Disability Registry')
], required=True)
# Connection
base_url = fields.Char(required=True)
auth_endpoint = fields.Char(default='/oauth2/client/token')
search_endpoint = fields.Char(default='/registry/sync/search')
jwks_endpoint = fields.Char(default='/.well-known/jwks.json')
# Credentials
client_id = fields.Char(required=True)
client_secret = fields.Char(required=True)
# Our identity
our_sender_id = fields.Char(required=True)
our_callback_uri = fields.Char()
# Signing
signing_key_id = fields.Many2one('spp.dci.signing.key')
# State
state = fields.Selection([
('draft', 'Draft'),
('active', 'Active'),
('error', 'Error')
], default='draft')
def action_test_connection(self):
"""Test connection to external registry"""
self.ensure_one()
client = self._get_client()
try:
token = client.authenticate()
self.state = 'active'
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': 'Success',
'message': 'Connection successful!',
'type': 'success'
}
}
except Exception as e:
self.state = 'error'
raise UserError(f"Connection failed: {e}")
Generic Client Service#
# spp_dci_client/services/dci_client.py
import httpx
from datetime import datetime
from ..schemas import DCISearchRequest, DCISearchResponse
class DCIClient:
"""Generic DCI client for any registry type"""
def __init__(self, data_source):
self.data_source = data_source
self._token = None
self._token_expires = None
async def authenticate(self) -> str:
"""Get OAuth2 access token"""
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.data_source.base_url}{self.data_source.auth_endpoint}",
data={
"grant_type": "client_credentials",
"client_id": self.data_source.client_id,
"client_secret": self.data_source.client_secret
}
)
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._token_expires = datetime.now().timestamp() + data.get("expires_in", 3600)
return self._token
async def search_by_identifier(
self,
identifier_type: str,
identifier_value: str
) -> DCISearchResponse:
"""Search by identifier type and value"""
return await self.search(
query_type="idtype-value",
query={
"type": identifier_type,
"value": identifier_value
}
)
async def search_by_expression(
self,
conditions: list,
logic: str = "and"
) -> DCISearchResponse:
"""Search with complex expression"""
if logic == "and":
expression = {"seq": conditions}
else:
expression = {"or": [{"seq": [c]} for c in conditions]}
return await self.search(
query_type="expression",
query={"expression": expression}
)
async def search(
self,
query_type: str,
query: dict,
reg_type: str = None,
page_size: int = 100
) -> DCISearchResponse:
"""Execute sync search"""
token = await self._get_token()
request = self._build_search_request(
query_type=query_type,
query=query,
reg_type=reg_type or self._default_reg_type(),
page_size=page_size
)
# Sign request
request = self._sign_request(request)
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{self.data_source.base_url}{self.data_source.search_endpoint}",
json=request.dict(),
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
)
response.raise_for_status()
return DCISearchResponse(**response.json())
def _build_search_request(
self,
query_type: str,
query: dict,
reg_type: str,
page_size: int
) -> DCISearchRequest:
"""Build DCI search request"""
now = datetime.utcnow()
return DCISearchRequest(
signature="",
header={
"version": "1.0.0",
"message_id": str(uuid.uuid4()),
"message_ts": now.isoformat(),
"action": "search",
"sender_id": self.data_source.our_sender_id,
"receiver_id": self._extract_receiver_id(),
"total_count": 1
},
message={
"transaction_id": str(uuid.uuid4()),
"search_request": [{
"reference_id": str(uuid.uuid4()),
"timestamp": now.isoformat(),
"search_criteria": {
"version": "1.0.0",
"reg_type": reg_type,
"query_type": query_type,
"query": query,
"pagination": {
"page_size": page_size,
"page_number": 1
}
}
}]
}
)
def _sign_request(self, request: DCISearchRequest) -> DCISearchRequest:
"""Add signature to request"""
if self.data_source.signing_key_id:
request.signature = self.data_source.signing_key_id.sign(
request.header,
request.message
)
return request
CRVS Client#
Import birth and death records from national Civil Registration systems.
Client Implementation#
# spp_dci_client_crvs/services/crvs_client.py
from datetime import date
from odoo.addons.spp_dci_client.services.dci_client import DCIClient
class CRVSClient(DCIClient):
"""Specialized client for CRVS systems"""
async def search_births(
self,
date_from: date,
date_to: date,
location: str = None,
page_size: int = 100
) -> list:
"""Search for birth registrations in date range"""
conditions = [
{"attribute": "birth_date", "operator": ">=", "value": date_from.isoformat()},
{"attribute": "birth_date", "operator": "<=", "value": date_to.isoformat()}
]
if location:
conditions.append({
"attribute": "birth_place.name",
"operator": "=",
"value": location
})
response = await self.search_by_expression(conditions)
if response.message.search_response:
return response.message.search_response[0].data.get("reg_records", [])
return []
async def search_deaths(
self,
date_from: date,
date_to: date
) -> list:
"""Search for death registrations"""
conditions = [
{"attribute": "death_date", "operator": ">=", "value": date_from.isoformat()},
{"attribute": "death_date", "operator": "<=", "value": date_to.isoformat()}
]
response = await self.search_by_expression(conditions)
if response.message.search_response:
return response.message.search_response[0].data.get("reg_records", [])
return []
def import_person(self, crvs_person: dict, env) -> int:
"""Import CRVS person into OpenSPP registry"""
mapper = CRVSPersonMapper()
vals = mapper.from_dci(crvs_person, env)
# Check for existing by identifier
for identifier in crvs_person.get("identifier", []):
existing = env['res.partner'].search([
('registry_id_ids.id_type_id.namespace_uri', '=', identifier['identifier_type']),
('registry_id_ids.value', '=', identifier['identifier_value'])
], limit=1)
if existing:
existing.write(vals)
return existing.id
# Create new
partner = env['res.partner'].create(vals)
# Link parent relationships
self._link_parents(partner, crvs_person, env)
return partner.id
def _link_parents(self, partner, crvs_person: dict, env):
"""Link parent identifiers to existing partners"""
for parent_field in ['parent1_identifier', 'parent2_identifier']:
parent_id = crvs_person.get(parent_field)
if parent_id:
parent = env['res.partner'].search([
('registry_id_ids.id_type_id.namespace_uri', '=', parent_id['identifier_type']),
('registry_id_ids.value', '=', parent_id['identifier_value'])
], limit=1)
if parent:
env['spp.family.relationship'].create({
'individual_id': partner.id,
'related_individual_id': parent.id,
'relationship_type_id': env.ref('spp_base.relationship_parent').id
})
Usage Example#
# Import births from last month
from datetime import date, timedelta
from odoo.addons.spp_dci_client_crvs.services.crvs_client import CRVSClient
# Configure CRVS data source
crvs_source = env['spp.dci.data.source'].search([('registry_type', '=', 'crvs')], limit=1)
client = CRVSClient(crvs_source)
# Query births
today = date.today()
last_month = today - timedelta(days=30)
births = await client.search_births(date_from=last_month, date_to=today)
# Import each birth
for birth in births:
partner_id = client.import_person(birth, env)
_logger.info("Imported birth: partner_id=%s", partner_id)
IBR Client#
Check enrollment status in other programs to prevent duplication.
Client Implementation#
# spp_dci_client_ibr/services/ibr_client.py
from dataclasses import dataclass
from typing import Optional
@dataclass
class ExternalEnrollment:
"""Represents an enrollment in an external program"""
programme_name: str
programme_id: str
enrollment_status: str # active, suspended, graduated, deceased
benefit_type: str
implementing_institution: str
enrollment_date: Optional[str] = None
last_benefit_date: Optional[str] = None
class IBRClient(DCIClient):
"""Client for Integrated Beneficiary Registry"""
async def check_enrollment(
self,
identifier_type: str,
identifier_value: str
) -> list[ExternalEnrollment]:
"""
Check if person is enrolled in any programs in the IBR.
Returns list of external enrollments.
"""
# Use IBR-specific /sync/enrolled endpoint
response = await self._call_enrolled_endpoint(identifier_type, identifier_value)
enrollments = []
for record in response.get('enrolled_programmes', []):
enrollments.append(ExternalEnrollment(
programme_name=record.get('programme_name'),
programme_id=record.get('programme_identifier'),
enrollment_status=record.get('enrollment_status'),
benefit_type=record.get('benefit', {}).get('benefit_type'),
implementing_institution=record.get('implementing_institution'),
enrollment_date=record.get('enrollment_date'),
last_benefit_date=record.get('last_benefit_date')
))
return enrollments
async def _call_enrolled_endpoint(
self,
identifier_type: str,
identifier_value: str
) -> dict:
"""Call IBR-specific /registry/sync/enrolled endpoint"""
token = await self._get_token()
request = self._build_enrolled_request(identifier_type, identifier_value)
request = self._sign_request(request)
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
f"{self.data_source.base_url}/registry/sync/enrolled",
json=request.dict(),
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
)
response.raise_for_status()
return response.json().get('message', {})
Integration with Enrollment Workflow#
# spp_dci_client_ibr/models/program_membership.py
from odoo import models, fields, api
from odoo.exceptions import UserError
class ProgramMembershipIBRCheck(models.Model):
_inherit = 'spp.program.membership'
# IBR check results
ibr_checked = fields.Boolean(default=False)
ibr_check_date = fields.Datetime()
ibr_enrollments_found = fields.Integer()
ibr_duplicate_warning = fields.Text()
@api.model_create_multi
def create(self, vals_list):
"""Check IBR before creating enrollment"""
for vals in vals_list:
if vals.get('partner_id'):
self._check_ibr_enrollment(vals)
return super().create(vals_list)
def _check_ibr_enrollment(self, vals: dict):
"""Check IBR and add warnings/blocks"""
# Get IBR data source
ibr_source = self.env['spp.dci.data.source'].search([
('registry_type', '=', 'ibr'),
('state', '=', 'active')
], limit=1)
if not ibr_source:
return # No IBR configured, skip check
partner = self.env['res.partner'].browse(vals['partner_id'])
primary_id = partner.registry_id_ids.filtered(lambda r: r.id_type_id.is_primary)[:1]
if not primary_id:
return
# Check IBR
from odoo.addons.spp_dci_client_ibr.services.ibr_client import IBRClient
client = IBRClient(ibr_source)
enrollments = asyncio.run(
client.check_enrollment(
primary_id.id_type_id.namespace_uri,
primary_id.value
)
)
vals['ibr_checked'] = True
vals['ibr_check_date'] = fields.Datetime.now()
vals['ibr_enrollments_found'] = len(enrollments)
# Filter active enrollments
active = [e for e in enrollments if e.enrollment_status == 'active']
if active:
program_names = [e.programme_name for e in active]
vals['ibr_duplicate_warning'] = (
f"Warning: Already enrolled in: {', '.join(program_names)}"
)
# Optionally block enrollment
if self.env['ir.config_parameter'].get_param('ibr.block_duplicates') == 'true':
raise UserError(
f"Cannot enroll {partner.name}: Already enrolled in {program_names[0]}"
)
Usage Example#
# Check enrollment before creating membership
partner = env['res.partner'].browse(123)
# Create enrollment (will trigger IBR check automatically)
membership = env['spp.program.membership'].create({
'partner_id': partner.id,
'program_id': env.ref('spp_programs.program_cash_transfer').id
})
# Check results
if membership.ibr_duplicate_warning:
_logger.warning("Duplicate enrollment: %s", membership.ibr_duplicate_warning)
Disability Registry Client#
Query external Disability Registries for PWD status.
Client Implementation#
# spp_dci_client_dr/services/dr_client.py
class DRClient(DCIClient):
"""Client for Disability Registry"""
async def get_disability_status(
self,
identifier_type: str,
identifier_value: str
) -> dict:
"""
Query disability status for a person.
Returns disability_info with limitation types and severity levels.
"""
response = await self.search_by_identifier(identifier_type, identifier_value)
if response.message.search_response:
records = response.message.search_response[0].data.get("reg_records", [])
if records:
return records[0].get('disability_info', [])
return []
def has_severe_disability(self, disability_info: list) -> bool:
"""Check if person has severe disability (level 3 or 4)"""
return any(d.get('functional_severity', 0) >= 3 for d in disability_info)
def get_disability_types(self, disability_info: list) -> list[str]:
"""Extract disability limitation types"""
return [d.get('disability_limitation_type') for d in disability_info]
Integration with Eligibility#
# Using DR data in eligibility checks
from odoo.addons.spp_dci_client_dr.services.dr_client import DRClient
# Query DR for disability status
dr_source = env['spp.dci.data.source'].search([('registry_type', '=', 'dr')], limit=1)
client = DRClient(dr_source)
partner = env['res.partner'].browse(123)
national_id = partner.registry_id_ids.filtered(lambda r: r.id_type_id.is_primary)[:1]
disability_info = await client.get_disability_status(
national_id.id_type_id.namespace_uri,
national_id.value
)
# Check eligibility
is_eligible = client.has_severe_disability(disability_info)
DCI-Indicators Integration#
Integrate DCI data with OpenSPP's indicator system for CEL-based eligibility.
Available Metrics#
# spp_dci_indicators/services/dci_metrics.py
# Register DCI metrics for CEL expressions
METRICS = {
'dci.crvs.is_alive': {
'source': 'crvs',
'type': 'boolean',
'description': 'Person is alive per CRVS',
'ttl': 86400 # 24 hours
},
'dci.ibr.is_enrolled': {
'source': 'ibr',
'type': 'boolean',
'description': 'Enrolled in any IBR program',
'ttl': 3600 # 1 hour
},
'dci.ibr.program_count': {
'source': 'ibr',
'type': 'number',
'description': 'Number of active enrollments',
'ttl': 3600
},
'dci.dr.has_disability': {
'source': 'dr',
'type': 'boolean',
'description': 'Has any registered disability',
'ttl': 86400
},
'dci.dr.disability_level': {
'source': 'dr',
'type': 'number',
'description': 'Highest disability severity level (1-4)',
'ttl': 86400
}
}
CEL Expressions with DCI Data#
# Example eligibility expressions using DCI metrics
# PWD Cash Transfer Program
"dci.dr.has_disability == true and dci.dr.disability_level >= 3 and dci.ibr.is_enrolled == false"
# Senior Citizen Allowance
"dci.crvs.is_alive == true and age_years(r.birthdate) >= 60"
# Poverty-targeted (exclude already enrolled)
"poverty_score >= 0.7 and dci.ibr.program_count == 0"
# Household with disabled member
"members.exists(m, m.dci.dr.has_disability == true)"
Testing#
Unit Test: CRVS Client#
# spp_dci_client_crvs/tests/test_crvs_client.py
from odoo.tests.common import TransactionCase
from unittest.mock import patch, AsyncMock
class TestCRVSClient(TransactionCase):
def setUp(self):
super().setUp()
self.data_source = self.env['spp.dci.data.source'].create({
'name': 'Test CRVS',
'registry_type': 'crvs',
'base_url': 'https://test-crvs.example.org',
'client_id': 'test_client',
'client_secret': 'test_secret',
'our_sender_id': 'openspp.test'
})
@patch('httpx.AsyncClient.post')
async def test_search_births(self, mock_post):
"""Test birth search returns results"""
mock_post.return_value = AsyncMock(
status_code=200,
json=lambda: {
"message": {
"search_response": [{
"data": {
"reg_records": [
{
"name": {"given_name": "John", "surname": "Doe"},
"birth_date": "2024-01-15"
}
]
}
}]
}
}
)
from odoo.addons.spp_dci_client_crvs.services.crvs_client import CRVSClient
client = CRVSClient(self.data_source)
births = await client.search_births(
date_from=date(2024, 1, 1),
date_to=date(2024, 12, 31)
)
self.assertEqual(len(births), 1)
self.assertEqual(births[0]['name']['given_name'], 'John')
See Also#
OpenSPP as DCI Server - OpenSPP as DCI server
DCI Protocol Details - DCI protocol details
openspp.org