src.api.http_adapter
HTTP REST API adapter
1"""HTTP REST API adapter""" 2 3import json 4from dataclasses import dataclass 5from typing import override 6from urllib.parse import urlparse 7 8import msgspec 9from flask import Flask, Request, Response, request 10from prometheus_flask_exporter import ( # pyright: ignore[reportMissingTypeStubs] PrometheusMetrics has no typing 11 PrometheusMetrics, 12) 13 14from src.config.config_port import ConfigRepoPort 15from src.credentials.credential_service import CredentialService 16from src.issuer_agent.issuer_agent_port import IssuerAgentError 17from src.metadata.metadata_service import HealthStatus, MetadataService 18from src.offers.offer_service import OfferService, PermissionDeniedError 19 20from .api_port import ApiPort 21 22 23class MissingTokenError(Exception): 24 """Raised when the Authorization header is absent or contains no token.""" 25 26 27@dataclass 28class CreateOfferBody: 29 """Parsed request body for the create offer endpoint.""" 30 31 award_id: str 32 33 34@dataclass 35class Proof: 36 """Proof object for credential request.""" 37 38 proof_type: str 39 jwt: str 40 41 42@dataclass 43class CredentialRequestBody: 44 """Parsed request body for the credential endpoint.""" 45 46 format: str 47 credential_configuration_id: str 48 proof: Proof 49 issuer_state: str 50 51 52class HttpApiAdapter(ApiPort): 53 """HTTP REST API adapter""" 54 55 flask_app: Flask 56 metadata_service: MetadataService 57 offer_service: OfferService 58 credential_service: CredentialService 59 config: ConfigRepoPort 60 61 def __init__( 62 self, 63 config: ConfigRepoPort, 64 metadata_service: MetadataService, 65 offer_service: OfferService, 66 credential_service: CredentialService, 67 ): 68 """Initialise the adapter. 69 70 Args: 71 config: Application configuration. 72 metadata_service: Service for credential issuer metadata. 73 offer_service: Service for creating credential offers. 74 credential_service: Service for requesting credentials. 75 """ 76 self.metadata_service = metadata_service 77 self.offer_service = offer_service 78 self.credential_service = credential_service 79 self.config = config 80 self.flask_app = self._flask_app() 81 82 def _flask_app(self) -> Flask: 83 app = Flask("HttpApi") 84 85 # Metrics endpoint is only relevant to HttpAdapter 86 # no need for service/domain models 87 metrics: PrometheusMetrics = PrometheusMetrics(app) 88 _ = metrics.info("app_info", "Application info", version="1.0.3") # pyright: ignore[reportUnknownMemberType] PrometheusMetrics has no typing 89 90 @app.route("/health") 91 @metrics.do_not_track() 92 def health() -> str: # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 93 """Health check endpoint.""" 94 health = self.metadata_service.get_health() 95 if health == HealthStatus.HEALTHY: 96 return "OK" 97 else: 98 return "NOT OK" 99 100 @app.route("/") 101 def root() -> str: # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 102 """Root endpoint.""" 103 return "Hello, World!" 104 105 @app.route("/.well-known/openid-credential-issuer") 106 def credential_issuer_metadata() -> Response: # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 107 """Credential Issuer Metadata endpoint.""" 108 metadata = self.metadata_service.get_credential_issuer_metadata() 109 return Response( 110 response=msgspec.json.encode(metadata).decode(), 111 mimetype="application/json", 112 status=200, 113 ) 114 115 @app.route("/.well-known/did.json") 116 def did_document() -> Response: # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 117 """DID document endpoint for the issuer.""" 118 public_url = self.config.public_url 119 parsed = urlparse(public_url) 120 host = parsed.hostname or "localhost" 121 port = parsed.port or (443 if parsed.scheme == "https" else 80) 122 123 # Create DID from host and port 124 # For DID Web, only the port colon needs to be percent-encoded 125 # did:web: uses literal colons, but the port separator : needs to be %3A 126 did = f"did:web:{host}%3A{port}" 127 128 # The public key is hardcoded for now 129 # In production this would come from the agent 130 did_document = { 131 "@context": [ 132 "https://www.w3.org/ns/did/v1", 133 "https://w3id.org/security/suites/ed25519-2020/v1", 134 ], 135 "id": did, 136 "verificationMethod": [ 137 { 138 "id": f"{did}#key-1", 139 "type": "Ed25519VerificationKey2020", 140 "controller": did, 141 "publicKeyPem": ( 142 "-----BEGIN PUBLIC KEY-----\n" 143 "MCowBQYDK2VwAyEAX4FOGLXPUOD06/9ygJ1wyZX+qreCuuZu3xl/rB4OJXA=\n" 144 "-----END PUBLIC KEY-----" 145 ), 146 } 147 ], 148 "authentication": [f"{did}#key-1"], 149 "assertionMethod": [f"{did}#key-1"], 150 } 151 return Response( 152 response = msgspec.json.encode(did_document), 153 status = 200, 154 mimetype = "application/json" 155 ) 156 157 @app.route("/api/v1/offers", methods=["POST"]) 158 def create_offer(): # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 159 """Create a credential offer for an achievement.""" 160 try: 161 bearer_token = self._bearer_token(request) 162 except MissingTokenError: 163 return json.dumps({"error": "Unauthorized"}), 401 164 165 raw: dict[str, str] = request.get_json(silent=True) or {} 166 body = CreateOfferBody(award_id=raw.get("award_id", "")) 167 168 try: 169 offer = self.offer_service.create_offer( 170 award_id=body.award_id, 171 bearer_token=bearer_token, 172 ) 173 except PermissionDeniedError: 174 return json.dumps({"error": "Forbidden"}), 403 175 176 return json.dumps({"offer_id": offer.offer_id, "uri": offer.uri}), 201 177 178 @app.route("/credential", methods=["POST"]) 179 def request_credential(): # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 180 """Request a credential from the issuer.""" 181 try: 182 access_token = self._bearer_token(request) 183 except MissingTokenError: 184 return json.dumps({"error": "Unauthorized"}), 401 185 186 raw_json = request.get_data(as_text=True) 187 188 try: 189 body: CredentialRequestBody = msgspec.json.decode( 190 raw_json, type=CredentialRequestBody 191 ) 192 credential_response = self.credential_service.request_credential( 193 format=body.format, 194 credential_configuration_id=body.credential_configuration_id, 195 proof={"proof_type": body.proof.proof_type, "jwt": body.proof.jwt}, 196 issuer_state=body.issuer_state, 197 access_token=access_token, 198 ) 199 except msgspec.ValidationError as e: 200 return json.dumps({"error": str(e)}), 400 201 except IssuerAgentError as e: 202 return json.dumps({"error": str(e)}), 502 203 204 return msgspec.json.encode(credential_response).decode(), 200 205 206 @app.route("/nonce", methods=["POST"]) 207 def request_nonce(): # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 208 """Request a nonce from the issuer agent.""" 209 try: 210 nonce_response = self.credential_service.request_nonce() 211 except IssuerAgentError as e: 212 return json.dumps({"error": str(e)}), 502 213 214 return msgspec.json.encode(nonce_response).decode(), 200 215 216 return app 217 218 @override 219 def run(self): 220 self.flask_app.run( 221 host=self.config.server_host, port=self.config.server_port, debug=True 222 ) 223 224 def _bearer_token(self, request: Request) -> str: 225 """Extract the bearer token from the Authorization header. 226 227 Args: 228 request: The incoming Flask request. 229 230 Returns: 231 The bearer token string. 232 233 Raises: 234 MissingTokenError: When the header is absent or the token is empty. 235 """ 236 auth_header = request.authorization 237 if not auth_header or not auth_header.token: 238 raise MissingTokenError() 239 return auth_header.token
class
MissingTokenError(builtins.Exception):
24class MissingTokenError(Exception): 25 """Raised when the Authorization header is absent or contains no token."""
Raised when the Authorization header is absent or contains no token.
@dataclass
class
CreateOfferBody:
28@dataclass 29class CreateOfferBody: 30 """Parsed request body for the create offer endpoint.""" 31 32 award_id: str
Parsed request body for the create offer endpoint.
@dataclass
class
Proof:
35@dataclass 36class Proof: 37 """Proof object for credential request.""" 38 39 proof_type: str 40 jwt: str
Proof object for credential request.
@dataclass
class
CredentialRequestBody:
43@dataclass 44class CredentialRequestBody: 45 """Parsed request body for the credential endpoint.""" 46 47 format: str 48 credential_configuration_id: str 49 proof: Proof 50 issuer_state: str
Parsed request body for the credential endpoint.
CredentialRequestBody( format: str, credential_configuration_id: str, proof: Proof, issuer_state: str)
proof: Proof
53class HttpApiAdapter(ApiPort): 54 """HTTP REST API adapter""" 55 56 flask_app: Flask 57 metadata_service: MetadataService 58 offer_service: OfferService 59 credential_service: CredentialService 60 config: ConfigRepoPort 61 62 def __init__( 63 self, 64 config: ConfigRepoPort, 65 metadata_service: MetadataService, 66 offer_service: OfferService, 67 credential_service: CredentialService, 68 ): 69 """Initialise the adapter. 70 71 Args: 72 config: Application configuration. 73 metadata_service: Service for credential issuer metadata. 74 offer_service: Service for creating credential offers. 75 credential_service: Service for requesting credentials. 76 """ 77 self.metadata_service = metadata_service 78 self.offer_service = offer_service 79 self.credential_service = credential_service 80 self.config = config 81 self.flask_app = self._flask_app() 82 83 def _flask_app(self) -> Flask: 84 app = Flask("HttpApi") 85 86 # Metrics endpoint is only relevant to HttpAdapter 87 # no need for service/domain models 88 metrics: PrometheusMetrics = PrometheusMetrics(app) 89 _ = metrics.info("app_info", "Application info", version="1.0.3") # pyright: ignore[reportUnknownMemberType] PrometheusMetrics has no typing 90 91 @app.route("/health") 92 @metrics.do_not_track() 93 def health() -> str: # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 94 """Health check endpoint.""" 95 health = self.metadata_service.get_health() 96 if health == HealthStatus.HEALTHY: 97 return "OK" 98 else: 99 return "NOT OK" 100 101 @app.route("/") 102 def root() -> str: # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 103 """Root endpoint.""" 104 return "Hello, World!" 105 106 @app.route("/.well-known/openid-credential-issuer") 107 def credential_issuer_metadata() -> Response: # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 108 """Credential Issuer Metadata endpoint.""" 109 metadata = self.metadata_service.get_credential_issuer_metadata() 110 return Response( 111 response=msgspec.json.encode(metadata).decode(), 112 mimetype="application/json", 113 status=200, 114 ) 115 116 @app.route("/.well-known/did.json") 117 def did_document() -> Response: # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 118 """DID document endpoint for the issuer.""" 119 public_url = self.config.public_url 120 parsed = urlparse(public_url) 121 host = parsed.hostname or "localhost" 122 port = parsed.port or (443 if parsed.scheme == "https" else 80) 123 124 # Create DID from host and port 125 # For DID Web, only the port colon needs to be percent-encoded 126 # did:web: uses literal colons, but the port separator : needs to be %3A 127 did = f"did:web:{host}%3A{port}" 128 129 # The public key is hardcoded for now 130 # In production this would come from the agent 131 did_document = { 132 "@context": [ 133 "https://www.w3.org/ns/did/v1", 134 "https://w3id.org/security/suites/ed25519-2020/v1", 135 ], 136 "id": did, 137 "verificationMethod": [ 138 { 139 "id": f"{did}#key-1", 140 "type": "Ed25519VerificationKey2020", 141 "controller": did, 142 "publicKeyPem": ( 143 "-----BEGIN PUBLIC KEY-----\n" 144 "MCowBQYDK2VwAyEAX4FOGLXPUOD06/9ygJ1wyZX+qreCuuZu3xl/rB4OJXA=\n" 145 "-----END PUBLIC KEY-----" 146 ), 147 } 148 ], 149 "authentication": [f"{did}#key-1"], 150 "assertionMethod": [f"{did}#key-1"], 151 } 152 return Response( 153 response = msgspec.json.encode(did_document), 154 status = 200, 155 mimetype = "application/json" 156 ) 157 158 @app.route("/api/v1/offers", methods=["POST"]) 159 def create_offer(): # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 160 """Create a credential offer for an achievement.""" 161 try: 162 bearer_token = self._bearer_token(request) 163 except MissingTokenError: 164 return json.dumps({"error": "Unauthorized"}), 401 165 166 raw: dict[str, str] = request.get_json(silent=True) or {} 167 body = CreateOfferBody(award_id=raw.get("award_id", "")) 168 169 try: 170 offer = self.offer_service.create_offer( 171 award_id=body.award_id, 172 bearer_token=bearer_token, 173 ) 174 except PermissionDeniedError: 175 return json.dumps({"error": "Forbidden"}), 403 176 177 return json.dumps({"offer_id": offer.offer_id, "uri": offer.uri}), 201 178 179 @app.route("/credential", methods=["POST"]) 180 def request_credential(): # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 181 """Request a credential from the issuer.""" 182 try: 183 access_token = self._bearer_token(request) 184 except MissingTokenError: 185 return json.dumps({"error": "Unauthorized"}), 401 186 187 raw_json = request.get_data(as_text=True) 188 189 try: 190 body: CredentialRequestBody = msgspec.json.decode( 191 raw_json, type=CredentialRequestBody 192 ) 193 credential_response = self.credential_service.request_credential( 194 format=body.format, 195 credential_configuration_id=body.credential_configuration_id, 196 proof={"proof_type": body.proof.proof_type, "jwt": body.proof.jwt}, 197 issuer_state=body.issuer_state, 198 access_token=access_token, 199 ) 200 except msgspec.ValidationError as e: 201 return json.dumps({"error": str(e)}), 400 202 except IssuerAgentError as e: 203 return json.dumps({"error": str(e)}), 502 204 205 return msgspec.json.encode(credential_response).decode(), 200 206 207 @app.route("/nonce", methods=["POST"]) 208 def request_nonce(): # pyright: ignore[reportUnusedFunction] Flask decorators aren't called by design 209 """Request a nonce from the issuer agent.""" 210 try: 211 nonce_response = self.credential_service.request_nonce() 212 except IssuerAgentError as e: 213 return json.dumps({"error": str(e)}), 502 214 215 return msgspec.json.encode(nonce_response).decode(), 200 216 217 return app 218 219 @override 220 def run(self): 221 self.flask_app.run( 222 host=self.config.server_host, port=self.config.server_port, debug=True 223 ) 224 225 def _bearer_token(self, request: Request) -> str: 226 """Extract the bearer token from the Authorization header. 227 228 Args: 229 request: The incoming Flask request. 230 231 Returns: 232 The bearer token string. 233 234 Raises: 235 MissingTokenError: When the header is absent or the token is empty. 236 """ 237 auth_header = request.authorization 238 if not auth_header or not auth_header.token: 239 raise MissingTokenError() 240 return auth_header.token
HTTP REST API adapter
HttpApiAdapter( config: src.config.config_port.ConfigRepoPort, metadata_service: src.metadata.MetadataService, offer_service: src.offers.offer_service.OfferService, credential_service: src.credentials.credential_service.CredentialService)
62 def __init__( 63 self, 64 config: ConfigRepoPort, 65 metadata_service: MetadataService, 66 offer_service: OfferService, 67 credential_service: CredentialService, 68 ): 69 """Initialise the adapter. 70 71 Args: 72 config: Application configuration. 73 metadata_service: Service for credential issuer metadata. 74 offer_service: Service for creating credential offers. 75 credential_service: Service for requesting credentials. 76 """ 77 self.metadata_service = metadata_service 78 self.offer_service = offer_service 79 self.credential_service = credential_service 80 self.config = config 81 self.flask_app = self._flask_app()
Initialise the adapter.
Args: config: Application configuration. metadata_service: Service for credential issuer metadata. offer_service: Service for creating credential offers. credential_service: Service for requesting credentials.
metadata_service: src.metadata.MetadataService
offer_service: src.offers.offer_service.OfferService
credential_service: src.credentials.credential_service.CredentialService