src.issuer_agent.ssi_agent_adapter
SSI-Agent Adapter for offer and credential operations.
1"""SSI-Agent Adapter for offer and credential operations.""" 2 3from typing import Protocol, override 4 5import msgspec 6import requests 7 8from src.config.config_port import ConfigRepoPort 9from src.credentials.credential import CredentialResponse 10from src.issuer_agent.issuer_agent_port import IssuerAgentError 11from src.issuer_agent.issuer_agent_port import IssuerAgentPort 12from src.metadata.credential_issuer_metadata import CredentialIssuerMetadata 13 14 15class SsiAgentHttpResponse(Protocol): 16 """ 17 Interface for HTTP response objects so that we can replace the actual HTTP client. 18 """ 19 20 @property 21 def status_code(self) -> int: 22 """The HTTP status code.""" 23 ... 24 25 @property 26 def content(self) -> bytes: 27 """The response content as bytes.""" 28 ... 29 30 31class SsiAgentHttpClient(Protocol): 32 """ 33 Interface for HTTP client objects so that we can replace the actual HTTP client. 34 """ 35 36 def get(self, url: str, timeout: int) -> SsiAgentHttpResponse: 37 """Send a GET request. 38 39 Args: 40 url: The URL to send the request to. 41 timeout: The timeout in seconds. 42 43 Returns: 44 A response object. 45 """ 46 ... 47 48 def post( 49 self, 50 url: str, 51 data: bytes, 52 headers: dict[str, str], 53 timeout: int, 54 ) -> SsiAgentHttpResponse: 55 """Send a POST request. 56 57 Args: 58 url: The URL to send the request to. 59 data: The request body as bytes. 60 headers: The request headers. 61 timeout: The timeout in seconds. 62 63 Returns: 64 A response object. 65 """ 66 ... 67 68 69class RequestsWrapperResponse(SsiAgentHttpResponse): 70 """Wrapper that adapts the requests.Response to our SsiAgentHttpResponse.""" 71 72 _response: requests.Response 73 74 def __init__(self, response: requests.Response) -> None: 75 """Initialize the wrapper. 76 77 Args: 78 response: The requests.Response object to wrap. 79 """ 80 self._response = response 81 82 @property 83 @override 84 def status_code(self) -> int: 85 """The HTTP status code of the response.""" 86 return self._response.status_code 87 88 @property 89 @override 90 def content(self) -> bytes: 91 """The response content as bytes.""" 92 return self._response.content 93 94 95class RequestsWrapper(SsiAgentHttpClient): 96 """Wrapper that adapts the requests module to our IssuerAgentHttpClient.""" 97 98 @override 99 def get(self, url: str, timeout: int) -> SsiAgentHttpResponse: 100 """Send a GET request using the requests library. 101 102 Args: 103 url: The URL to send the request to. 104 timeout: The timeout in seconds. 105 106 Returns: 107 A requests.Response object that implements SsiAgentHttpResponse. 108 """ 109 return RequestsWrapperResponse(requests.get(url, timeout=timeout)) 110 111 @override 112 def post( 113 self, 114 url: str, 115 data: bytes, 116 headers: dict[str, str], 117 timeout: int, 118 ) -> SsiAgentHttpResponse: 119 """Send a POST request using the requests library. 120 121 Args: 122 url: The URL to send the request to. 123 data: The request body as bytes. 124 headers: The request headers. 125 timeout: The timeout in seconds. 126 127 Returns: 128 A requests.Response object that implements SsiAgentHttpResponse. 129 """ 130 return RequestsWrapperResponse( 131 requests.post(url, data=data, headers=headers, timeout=timeout) 132 ) 133 134 135class SsiAgentAdapter(IssuerAgentPort): 136 """Proxy adapter for Credential Issuer Metadata.""" 137 138 def __init__( 139 self, 140 *, 141 config: ConfigRepoPort, 142 requests_client: SsiAgentHttpClient | None = None, 143 ) -> None: 144 """Initialize the proxy adapter. 145 146 Args: 147 config: The configuration repository. 148 requests_client: The requests client to use. 149 """ 150 self.ssi_agent_url: str = config.ssi_agent_url 151 self.ssi_agent_nonce_endpoint: str = config.ssi_agent_nonce_endpoint 152 self.ssi_agent_credential_endpoint: str = config.ssi_agent_credential_endpoint 153 self.requests_client: SsiAgentHttpClient = requests_client or RequestsWrapper() 154 self.timeout: int = 10 # Hardcoded timeout in seconds 155 156 @override 157 def credential_issuer_metadata(self) -> CredentialIssuerMetadata: 158 """Not supported - use SsiAgentMetadataAdapter for metadata.""" 159 raise NotImplementedError("Use SsiAgentMetadataAdapter for metadata") 160 161 @override 162 def credential_request( 163 self, 164 format: str, 165 credential_configuration_id: str, 166 proof: dict[str, object], 167 issuer_state: str, 168 access_token: str, 169 ) -> CredentialResponse: 170 """Proxy the credential request to the SSI agent. 171 172 Args: 173 format: The credential format. 174 credential_configuration_id: The credential configuration identifier. 175 proof: The proof object containing proof_type and jwt. 176 issuer_state: The issuer state from the offer. 177 access_token: The access token for authorization. 178 179 Returns: 180 CredentialResponse containing the issued credential(s). 181 182 Raises: 183 IssuerAgentError: When the upstream request fails. 184 """ 185 request_body = { 186 "format": format, 187 "credential_configuration_id": credential_configuration_id, 188 "proof": proof, 189 "issuer_state": issuer_state, 190 } 191 192 headers = { 193 "Content-Type": "application/json", 194 "Authorization": f"Bearer {access_token}", 195 } 196 197 # Forward the request to the issuer agent 198 response = self.requests_client.post( 199 self.ssi_agent_credential_endpoint, 200 data=msgspec.json.encode(request_body), 201 headers=headers, 202 timeout=self.timeout, 203 ) 204 205 # Handle HTTP errors from upstream 206 if 400 <= int(response.status_code) < 600: 207 raise IssuerAgentError( 208 f"Upstream error: {response.status_code} - {response.content.decode()}" 209 ) 210 211 credential_response: CredentialResponse = msgspec.json.decode( 212 response.content, type=CredentialResponse 213 ) 214 215 return credential_response 216 217 @override 218 def request_nonce(self) -> dict[str, str]: 219 """Request a nonce from the issuer agent. 220 221 Returns: 222 A dictionary containing the c_nonce. 223 224 Raises: 225 IssuerAgentError: When the upstream request fails. 226 """ 227 response = self.requests_client.post( 228 self.ssi_agent_nonce_endpoint, 229 data=b"", 230 headers={}, 231 timeout=self.timeout, 232 ) 233 234 # Handle HTTP errors from upstream 235 if 400 <= int(response.status_code) < 600: 236 raise IssuerAgentError( 237 f"Upstream error: {response.status_code} - {response.content.decode()}" 238 ) 239 240 return msgspec.json.decode(response.content, type=dict[str, str])
16class SsiAgentHttpResponse(Protocol): 17 """ 18 Interface for HTTP response objects so that we can replace the actual HTTP client. 19 """ 20 21 @property 22 def status_code(self) -> int: 23 """The HTTP status code.""" 24 ... 25 26 @property 27 def content(self) -> bytes: 28 """The response content as bytes.""" 29 ...
Interface for HTTP response objects so that we can replace the actual HTTP client.
1866def _no_init_or_replace_init(self, *args, **kwargs): 1867 cls = type(self) 1868 1869 if cls._is_protocol: 1870 raise TypeError('Protocols cannot be instantiated') 1871 1872 # Already using a custom `__init__`. No need to calculate correct 1873 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1874 if cls.__init__ is not _no_init_or_replace_init: 1875 return 1876 1877 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1878 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1879 # searches for a proper new `__init__` in the MRO. The new `__init__` 1880 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1881 # instantiation of the protocol subclass will thus use the new 1882 # `__init__` and no longer call `_no_init_or_replace_init`. 1883 for base in cls.__mro__: 1884 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1885 if init is not _no_init_or_replace_init: 1886 cls.__init__ = init 1887 break 1888 else: 1889 # should not happen 1890 cls.__init__ = object.__init__ 1891 1892 cls.__init__(self, *args, **kwargs)
32class SsiAgentHttpClient(Protocol): 33 """ 34 Interface for HTTP client objects so that we can replace the actual HTTP client. 35 """ 36 37 def get(self, url: str, timeout: int) -> SsiAgentHttpResponse: 38 """Send a GET request. 39 40 Args: 41 url: The URL to send the request to. 42 timeout: The timeout in seconds. 43 44 Returns: 45 A response object. 46 """ 47 ... 48 49 def post( 50 self, 51 url: str, 52 data: bytes, 53 headers: dict[str, str], 54 timeout: int, 55 ) -> SsiAgentHttpResponse: 56 """Send a POST request. 57 58 Args: 59 url: The URL to send the request to. 60 data: The request body as bytes. 61 headers: The request headers. 62 timeout: The timeout in seconds. 63 64 Returns: 65 A response object. 66 """ 67 ...
Interface for HTTP client objects so that we can replace the actual HTTP client.
1866def _no_init_or_replace_init(self, *args, **kwargs): 1867 cls = type(self) 1868 1869 if cls._is_protocol: 1870 raise TypeError('Protocols cannot be instantiated') 1871 1872 # Already using a custom `__init__`. No need to calculate correct 1873 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1874 if cls.__init__ is not _no_init_or_replace_init: 1875 return 1876 1877 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1878 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1879 # searches for a proper new `__init__` in the MRO. The new `__init__` 1880 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1881 # instantiation of the protocol subclass will thus use the new 1882 # `__init__` and no longer call `_no_init_or_replace_init`. 1883 for base in cls.__mro__: 1884 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1885 if init is not _no_init_or_replace_init: 1886 cls.__init__ = init 1887 break 1888 else: 1889 # should not happen 1890 cls.__init__ = object.__init__ 1891 1892 cls.__init__(self, *args, **kwargs)
37 def get(self, url: str, timeout: int) -> SsiAgentHttpResponse: 38 """Send a GET request. 39 40 Args: 41 url: The URL to send the request to. 42 timeout: The timeout in seconds. 43 44 Returns: 45 A response object. 46 """ 47 ...
Send a GET request.
Args: url: The URL to send the request to. timeout: The timeout in seconds.
Returns: A response object.
49 def post( 50 self, 51 url: str, 52 data: bytes, 53 headers: dict[str, str], 54 timeout: int, 55 ) -> SsiAgentHttpResponse: 56 """Send a POST request. 57 58 Args: 59 url: The URL to send the request to. 60 data: The request body as bytes. 61 headers: The request headers. 62 timeout: The timeout in seconds. 63 64 Returns: 65 A response object. 66 """ 67 ...
Send a POST request.
Args: url: The URL to send the request to. data: The request body as bytes. headers: The request headers. timeout: The timeout in seconds.
Returns: A response object.
70class RequestsWrapperResponse(SsiAgentHttpResponse): 71 """Wrapper that adapts the requests.Response to our SsiAgentHttpResponse.""" 72 73 _response: requests.Response 74 75 def __init__(self, response: requests.Response) -> None: 76 """Initialize the wrapper. 77 78 Args: 79 response: The requests.Response object to wrap. 80 """ 81 self._response = response 82 83 @property 84 @override 85 def status_code(self) -> int: 86 """The HTTP status code of the response.""" 87 return self._response.status_code 88 89 @property 90 @override 91 def content(self) -> bytes: 92 """The response content as bytes.""" 93 return self._response.content
Wrapper that adapts the requests.Response to our SsiAgentHttpResponse.
75 def __init__(self, response: requests.Response) -> None: 76 """Initialize the wrapper. 77 78 Args: 79 response: The requests.Response object to wrap. 80 """ 81 self._response = response
Initialize the wrapper.
Args: response: The requests.Response object to wrap.
96class RequestsWrapper(SsiAgentHttpClient): 97 """Wrapper that adapts the requests module to our IssuerAgentHttpClient.""" 98 99 @override 100 def get(self, url: str, timeout: int) -> SsiAgentHttpResponse: 101 """Send a GET request using the requests library. 102 103 Args: 104 url: The URL to send the request to. 105 timeout: The timeout in seconds. 106 107 Returns: 108 A requests.Response object that implements SsiAgentHttpResponse. 109 """ 110 return RequestsWrapperResponse(requests.get(url, timeout=timeout)) 111 112 @override 113 def post( 114 self, 115 url: str, 116 data: bytes, 117 headers: dict[str, str], 118 timeout: int, 119 ) -> SsiAgentHttpResponse: 120 """Send a POST request using the requests library. 121 122 Args: 123 url: The URL to send the request to. 124 data: The request body as bytes. 125 headers: The request headers. 126 timeout: The timeout in seconds. 127 128 Returns: 129 A requests.Response object that implements SsiAgentHttpResponse. 130 """ 131 return RequestsWrapperResponse( 132 requests.post(url, data=data, headers=headers, timeout=timeout) 133 )
Wrapper that adapts the requests module to our IssuerAgentHttpClient.
99 @override 100 def get(self, url: str, timeout: int) -> SsiAgentHttpResponse: 101 """Send a GET request using the requests library. 102 103 Args: 104 url: The URL to send the request to. 105 timeout: The timeout in seconds. 106 107 Returns: 108 A requests.Response object that implements SsiAgentHttpResponse. 109 """ 110 return RequestsWrapperResponse(requests.get(url, timeout=timeout))
Send a GET request using the requests library.
Args: url: The URL to send the request to. timeout: The timeout in seconds.
Returns: A requests.Response object that implements SsiAgentHttpResponse.
112 @override 113 def post( 114 self, 115 url: str, 116 data: bytes, 117 headers: dict[str, str], 118 timeout: int, 119 ) -> SsiAgentHttpResponse: 120 """Send a POST request using the requests library. 121 122 Args: 123 url: The URL to send the request to. 124 data: The request body as bytes. 125 headers: The request headers. 126 timeout: The timeout in seconds. 127 128 Returns: 129 A requests.Response object that implements SsiAgentHttpResponse. 130 """ 131 return RequestsWrapperResponse( 132 requests.post(url, data=data, headers=headers, timeout=timeout) 133 )
Send a POST request using the requests library.
Args: url: The URL to send the request to. data: The request body as bytes. headers: The request headers. timeout: The timeout in seconds.
Returns: A requests.Response object that implements SsiAgentHttpResponse.
Inherited Members
136class SsiAgentAdapter(IssuerAgentPort): 137 """Proxy adapter for Credential Issuer Metadata.""" 138 139 def __init__( 140 self, 141 *, 142 config: ConfigRepoPort, 143 requests_client: SsiAgentHttpClient | None = None, 144 ) -> None: 145 """Initialize the proxy adapter. 146 147 Args: 148 config: The configuration repository. 149 requests_client: The requests client to use. 150 """ 151 self.ssi_agent_url: str = config.ssi_agent_url 152 self.ssi_agent_nonce_endpoint: str = config.ssi_agent_nonce_endpoint 153 self.ssi_agent_credential_endpoint: str = config.ssi_agent_credential_endpoint 154 self.requests_client: SsiAgentHttpClient = requests_client or RequestsWrapper() 155 self.timeout: int = 10 # Hardcoded timeout in seconds 156 157 @override 158 def credential_issuer_metadata(self) -> CredentialIssuerMetadata: 159 """Not supported - use SsiAgentMetadataAdapter for metadata.""" 160 raise NotImplementedError("Use SsiAgentMetadataAdapter for metadata") 161 162 @override 163 def credential_request( 164 self, 165 format: str, 166 credential_configuration_id: str, 167 proof: dict[str, object], 168 issuer_state: str, 169 access_token: str, 170 ) -> CredentialResponse: 171 """Proxy the credential request to the SSI agent. 172 173 Args: 174 format: The credential format. 175 credential_configuration_id: The credential configuration identifier. 176 proof: The proof object containing proof_type and jwt. 177 issuer_state: The issuer state from the offer. 178 access_token: The access token for authorization. 179 180 Returns: 181 CredentialResponse containing the issued credential(s). 182 183 Raises: 184 IssuerAgentError: When the upstream request fails. 185 """ 186 request_body = { 187 "format": format, 188 "credential_configuration_id": credential_configuration_id, 189 "proof": proof, 190 "issuer_state": issuer_state, 191 } 192 193 headers = { 194 "Content-Type": "application/json", 195 "Authorization": f"Bearer {access_token}", 196 } 197 198 # Forward the request to the issuer agent 199 response = self.requests_client.post( 200 self.ssi_agent_credential_endpoint, 201 data=msgspec.json.encode(request_body), 202 headers=headers, 203 timeout=self.timeout, 204 ) 205 206 # Handle HTTP errors from upstream 207 if 400 <= int(response.status_code) < 600: 208 raise IssuerAgentError( 209 f"Upstream error: {response.status_code} - {response.content.decode()}" 210 ) 211 212 credential_response: CredentialResponse = msgspec.json.decode( 213 response.content, type=CredentialResponse 214 ) 215 216 return credential_response 217 218 @override 219 def request_nonce(self) -> dict[str, str]: 220 """Request a nonce from the issuer agent. 221 222 Returns: 223 A dictionary containing the c_nonce. 224 225 Raises: 226 IssuerAgentError: When the upstream request fails. 227 """ 228 response = self.requests_client.post( 229 self.ssi_agent_nonce_endpoint, 230 data=b"", 231 headers={}, 232 timeout=self.timeout, 233 ) 234 235 # Handle HTTP errors from upstream 236 if 400 <= int(response.status_code) < 600: 237 raise IssuerAgentError( 238 f"Upstream error: {response.status_code} - {response.content.decode()}" 239 ) 240 241 return msgspec.json.decode(response.content, type=dict[str, str])
Proxy adapter for Credential Issuer Metadata.
139 def __init__( 140 self, 141 *, 142 config: ConfigRepoPort, 143 requests_client: SsiAgentHttpClient | None = None, 144 ) -> None: 145 """Initialize the proxy adapter. 146 147 Args: 148 config: The configuration repository. 149 requests_client: The requests client to use. 150 """ 151 self.ssi_agent_url: str = config.ssi_agent_url 152 self.ssi_agent_nonce_endpoint: str = config.ssi_agent_nonce_endpoint 153 self.ssi_agent_credential_endpoint: str = config.ssi_agent_credential_endpoint 154 self.requests_client: SsiAgentHttpClient = requests_client or RequestsWrapper() 155 self.timeout: int = 10 # Hardcoded timeout in seconds
Initialize the proxy adapter.
Args: config: The configuration repository. requests_client: The requests client to use.
157 @override 158 def credential_issuer_metadata(self) -> CredentialIssuerMetadata: 159 """Not supported - use SsiAgentMetadataAdapter for metadata.""" 160 raise NotImplementedError("Use SsiAgentMetadataAdapter for metadata")
Not supported - use SsiAgentMetadataAdapter for metadata.
162 @override 163 def credential_request( 164 self, 165 format: str, 166 credential_configuration_id: str, 167 proof: dict[str, object], 168 issuer_state: str, 169 access_token: str, 170 ) -> CredentialResponse: 171 """Proxy the credential request to the SSI agent. 172 173 Args: 174 format: The credential format. 175 credential_configuration_id: The credential configuration identifier. 176 proof: The proof object containing proof_type and jwt. 177 issuer_state: The issuer state from the offer. 178 access_token: The access token for authorization. 179 180 Returns: 181 CredentialResponse containing the issued credential(s). 182 183 Raises: 184 IssuerAgentError: When the upstream request fails. 185 """ 186 request_body = { 187 "format": format, 188 "credential_configuration_id": credential_configuration_id, 189 "proof": proof, 190 "issuer_state": issuer_state, 191 } 192 193 headers = { 194 "Content-Type": "application/json", 195 "Authorization": f"Bearer {access_token}", 196 } 197 198 # Forward the request to the issuer agent 199 response = self.requests_client.post( 200 self.ssi_agent_credential_endpoint, 201 data=msgspec.json.encode(request_body), 202 headers=headers, 203 timeout=self.timeout, 204 ) 205 206 # Handle HTTP errors from upstream 207 if 400 <= int(response.status_code) < 600: 208 raise IssuerAgentError( 209 f"Upstream error: {response.status_code} - {response.content.decode()}" 210 ) 211 212 credential_response: CredentialResponse = msgspec.json.decode( 213 response.content, type=CredentialResponse 214 ) 215 216 return credential_response
Proxy the credential request to the SSI agent.
Args: format: The credential format. credential_configuration_id: The credential configuration identifier. proof: The proof object containing proof_type and jwt. issuer_state: The issuer state from the offer. access_token: The access token for authorization.
Returns: CredentialResponse containing the issued credential(s).
Raises: IssuerAgentError: When the upstream request fails.
218 @override 219 def request_nonce(self) -> dict[str, str]: 220 """Request a nonce from the issuer agent. 221 222 Returns: 223 A dictionary containing the c_nonce. 224 225 Raises: 226 IssuerAgentError: When the upstream request fails. 227 """ 228 response = self.requests_client.post( 229 self.ssi_agent_nonce_endpoint, 230 data=b"", 231 headers={}, 232 timeout=self.timeout, 233 ) 234 235 # Handle HTTP errors from upstream 236 if 400 <= int(response.status_code) < 600: 237 raise IssuerAgentError( 238 f"Upstream error: {response.status_code} - {response.content.decode()}" 239 ) 240 241 return msgspec.json.decode(response.content, type=dict[str, str])
Request a nonce from the issuer agent.
Returns: A dictionary containing the c_nonce.
Raises: IssuerAgentError: When the upstream request fails.