src.issuer_agent.ssi_agent_adapter

SSI-Agent Adapter for offer and credential operations.

  1"""SSI-Agent Adapter for offer and credential operations."""
  2
  3from typing import Protocol, override
  4
  5import msgspec
  6import requests
  7
  8from src.config.config_port import ConfigRepoPort
  9from src.credentials.credential import CredentialResponse
 10from src.issuer_agent.issuer_agent_port import IssuerAgentError
 11from src.issuer_agent.issuer_agent_port import IssuerAgentPort
 12from src.metadata.credential_issuer_metadata import CredentialIssuerMetadata
 13
 14
 15class SsiAgentHttpResponse(Protocol):
 16    """
 17    Interface for HTTP response objects so that we can replace the actual HTTP client.
 18    """
 19
 20    @property
 21    def status_code(self) -> int:
 22        """The HTTP status code."""
 23        ...
 24
 25    @property
 26    def content(self) -> bytes:
 27        """The response content as bytes."""
 28        ...
 29
 30
 31class SsiAgentHttpClient(Protocol):
 32    """
 33    Interface for HTTP client objects so that we can replace the actual HTTP client.
 34    """
 35
 36    def get(self, url: str, timeout: int) -> SsiAgentHttpResponse:
 37        """Send a GET request.
 38
 39        Args:
 40            url: The URL to send the request to.
 41            timeout: The timeout in seconds.
 42
 43        Returns:
 44            A response object.
 45        """
 46        ...
 47
 48    def post(
 49        self,
 50        url: str,
 51        data: bytes,
 52        headers: dict[str, str],
 53        timeout: int,
 54    ) -> SsiAgentHttpResponse:
 55        """Send a POST request.
 56
 57        Args:
 58            url: The URL to send the request to.
 59            data: The request body as bytes.
 60            headers: The request headers.
 61            timeout: The timeout in seconds.
 62
 63        Returns:
 64            A response object.
 65        """
 66        ...
 67
 68
 69class RequestsWrapperResponse(SsiAgentHttpResponse):
 70    """Wrapper that adapts the requests.Response to our SsiAgentHttpResponse."""
 71
 72    _response: requests.Response
 73
 74    def __init__(self, response: requests.Response) -> None:
 75        """Initialize the wrapper.
 76
 77        Args:
 78            response: The requests.Response object to wrap.
 79        """
 80        self._response = response
 81
 82    @property
 83    @override
 84    def status_code(self) -> int:
 85        """The HTTP status code of the response."""
 86        return self._response.status_code
 87
 88    @property
 89    @override
 90    def content(self) -> bytes:
 91        """The response content as bytes."""
 92        return self._response.content
 93
 94
 95class RequestsWrapper(SsiAgentHttpClient):
 96    """Wrapper that adapts the requests module to our IssuerAgentHttpClient."""
 97
 98    @override
 99    def get(self, url: str, timeout: int) -> SsiAgentHttpResponse:
100        """Send a GET request using the requests library.
101
102        Args:
103            url: The URL to send the request to.
104            timeout: The timeout in seconds.
105
106        Returns:
107            A requests.Response object that implements SsiAgentHttpResponse.
108        """
109        return RequestsWrapperResponse(requests.get(url, timeout=timeout))
110
111    @override
112    def post(
113        self,
114        url: str,
115        data: bytes,
116        headers: dict[str, str],
117        timeout: int,
118    ) -> SsiAgentHttpResponse:
119        """Send a POST request using the requests library.
120
121        Args:
122            url: The URL to send the request to.
123            data: The request body as bytes.
124            headers: The request headers.
125            timeout: The timeout in seconds.
126
127        Returns:
128            A requests.Response object that implements SsiAgentHttpResponse.
129        """
130        return RequestsWrapperResponse(
131            requests.post(url, data=data, headers=headers, timeout=timeout)
132        )
133
134
135class SsiAgentAdapter(IssuerAgentPort):
136    """Proxy adapter for Credential Issuer Metadata."""
137
138    def __init__(
139        self,
140        *,
141        config: ConfigRepoPort,
142        requests_client: SsiAgentHttpClient | None = None,
143    ) -> None:
144        """Initialize the proxy adapter.
145
146        Args:
147            config: The configuration repository.
148            requests_client: The requests client to use.
149        """
150        self.ssi_agent_url: str = config.ssi_agent_url
151        self.ssi_agent_nonce_endpoint: str = config.ssi_agent_nonce_endpoint
152        self.ssi_agent_credential_endpoint: str = config.ssi_agent_credential_endpoint
153        self.requests_client: SsiAgentHttpClient = requests_client or RequestsWrapper()
154        self.timeout: int = 10  # Hardcoded timeout in seconds
155
156    @override
157    def credential_issuer_metadata(self) -> CredentialIssuerMetadata:
158        """Not supported - use SsiAgentMetadataAdapter for metadata."""
159        raise NotImplementedError("Use SsiAgentMetadataAdapter for metadata")
160
161    @override
162    def credential_request(
163        self,
164        format: str,
165        credential_configuration_id: str,
166        proof: dict[str, object],
167        issuer_state: str,
168        access_token: str,
169    ) -> CredentialResponse:
170        """Proxy the credential request to the SSI agent.
171
172        Args:
173            format: The credential format.
174            credential_configuration_id: The credential configuration identifier.
175            proof: The proof object containing proof_type and jwt.
176            issuer_state: The issuer state from the offer.
177            access_token: The access token for authorization.
178
179        Returns:
180            CredentialResponse containing the issued credential(s).
181
182        Raises:
183            IssuerAgentError: When the upstream request fails.
184        """
185        request_body = {
186            "format": format,
187            "credential_configuration_id": credential_configuration_id,
188            "proof": proof,
189            "issuer_state": issuer_state,
190        }
191
192        headers = {
193            "Content-Type": "application/json",
194            "Authorization": f"Bearer {access_token}",
195        }
196
197        # Forward the request to the issuer agent
198        response = self.requests_client.post(
199            self.ssi_agent_credential_endpoint,
200            data=msgspec.json.encode(request_body),
201            headers=headers,
202            timeout=self.timeout,
203        )
204
205        # Handle HTTP errors from upstream
206        if 400 <= int(response.status_code) < 600:
207            raise IssuerAgentError(
208                f"Upstream error: {response.status_code} - {response.content.decode()}"
209            )
210
211        credential_response: CredentialResponse = msgspec.json.decode(
212            response.content, type=CredentialResponse
213        )
214
215        return credential_response
216
217    @override
218    def request_nonce(self) -> dict[str, str]:
219        """Request a nonce from the issuer agent.
220
221        Returns:
222            A dictionary containing the c_nonce.
223
224        Raises:
225            IssuerAgentError: When the upstream request fails.
226        """
227        response = self.requests_client.post(
228            self.ssi_agent_nonce_endpoint,
229            data=b"",
230            headers={},
231            timeout=self.timeout,
232        )
233
234        # Handle HTTP errors from upstream
235        if 400 <= int(response.status_code) < 600:
236            raise IssuerAgentError(
237                f"Upstream error: {response.status_code} - {response.content.decode()}"
238            )
239
240        return msgspec.json.decode(response.content, type=dict[str, str])
class SsiAgentHttpResponse(typing.Protocol):
16class SsiAgentHttpResponse(Protocol):
17    """
18    Interface for HTTP response objects so that we can replace the actual HTTP client.
19    """
20
21    @property
22    def status_code(self) -> int:
23        """The HTTP status code."""
24        ...
25
26    @property
27    def content(self) -> bytes:
28        """The response content as bytes."""
29        ...

Interface for HTTP response objects so that we can replace the actual HTTP client.

SsiAgentHttpResponse(*args, **kwargs)
1866def _no_init_or_replace_init(self, *args, **kwargs):
1867    cls = type(self)
1868
1869    if cls._is_protocol:
1870        raise TypeError('Protocols cannot be instantiated')
1871
1872    # Already using a custom `__init__`. No need to calculate correct
1873    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1874    if cls.__init__ is not _no_init_or_replace_init:
1875        return
1876
1877    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1878    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1879    # searches for a proper new `__init__` in the MRO. The new `__init__`
1880    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1881    # instantiation of the protocol subclass will thus use the new
1882    # `__init__` and no longer call `_no_init_or_replace_init`.
1883    for base in cls.__mro__:
1884        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1885        if init is not _no_init_or_replace_init:
1886            cls.__init__ = init
1887            break
1888    else:
1889        # should not happen
1890        cls.__init__ = object.__init__
1891
1892    cls.__init__(self, *args, **kwargs)
status_code: int
21    @property
22    def status_code(self) -> int:
23        """The HTTP status code."""
24        ...

The HTTP status code.

content: bytes
26    @property
27    def content(self) -> bytes:
28        """The response content as bytes."""
29        ...

The response content as bytes.

class SsiAgentHttpClient(typing.Protocol):
32class SsiAgentHttpClient(Protocol):
33    """
34    Interface for HTTP client objects so that we can replace the actual HTTP client.
35    """
36
37    def get(self, url: str, timeout: int) -> SsiAgentHttpResponse:
38        """Send a GET request.
39
40        Args:
41            url: The URL to send the request to.
42            timeout: The timeout in seconds.
43
44        Returns:
45            A response object.
46        """
47        ...
48
49    def post(
50        self,
51        url: str,
52        data: bytes,
53        headers: dict[str, str],
54        timeout: int,
55    ) -> SsiAgentHttpResponse:
56        """Send a POST request.
57
58        Args:
59            url: The URL to send the request to.
60            data: The request body as bytes.
61            headers: The request headers.
62            timeout: The timeout in seconds.
63
64        Returns:
65            A response object.
66        """
67        ...

Interface for HTTP client objects so that we can replace the actual HTTP client.

SsiAgentHttpClient(*args, **kwargs)
1866def _no_init_or_replace_init(self, *args, **kwargs):
1867    cls = type(self)
1868
1869    if cls._is_protocol:
1870        raise TypeError('Protocols cannot be instantiated')
1871
1872    # Already using a custom `__init__`. No need to calculate correct
1873    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1874    if cls.__init__ is not _no_init_or_replace_init:
1875        return
1876
1877    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1878    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1879    # searches for a proper new `__init__` in the MRO. The new `__init__`
1880    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1881    # instantiation of the protocol subclass will thus use the new
1882    # `__init__` and no longer call `_no_init_or_replace_init`.
1883    for base in cls.__mro__:
1884        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1885        if init is not _no_init_or_replace_init:
1886            cls.__init__ = init
1887            break
1888    else:
1889        # should not happen
1890        cls.__init__ = object.__init__
1891
1892    cls.__init__(self, *args, **kwargs)
def get( self, url: str, timeout: int) -> SsiAgentHttpResponse:
37    def get(self, url: str, timeout: int) -> SsiAgentHttpResponse:
38        """Send a GET request.
39
40        Args:
41            url: The URL to send the request to.
42            timeout: The timeout in seconds.
43
44        Returns:
45            A response object.
46        """
47        ...

Send a GET request.

Args: url: The URL to send the request to. timeout: The timeout in seconds.

Returns: A response object.

def post( self, url: str, data: bytes, headers: dict[str, str], timeout: int) -> SsiAgentHttpResponse:
49    def post(
50        self,
51        url: str,
52        data: bytes,
53        headers: dict[str, str],
54        timeout: int,
55    ) -> SsiAgentHttpResponse:
56        """Send a POST request.
57
58        Args:
59            url: The URL to send the request to.
60            data: The request body as bytes.
61            headers: The request headers.
62            timeout: The timeout in seconds.
63
64        Returns:
65            A response object.
66        """
67        ...

Send a POST request.

Args: url: The URL to send the request to. data: The request body as bytes. headers: The request headers. timeout: The timeout in seconds.

Returns: A response object.

class RequestsWrapperResponse(SsiAgentHttpResponse):
70class RequestsWrapperResponse(SsiAgentHttpResponse):
71    """Wrapper that adapts the requests.Response to our SsiAgentHttpResponse."""
72
73    _response: requests.Response
74
75    def __init__(self, response: requests.Response) -> None:
76        """Initialize the wrapper.
77
78        Args:
79            response: The requests.Response object to wrap.
80        """
81        self._response = response
82
83    @property
84    @override
85    def status_code(self) -> int:
86        """The HTTP status code of the response."""
87        return self._response.status_code
88
89    @property
90    @override
91    def content(self) -> bytes:
92        """The response content as bytes."""
93        return self._response.content

Wrapper that adapts the requests.Response to our SsiAgentHttpResponse.

RequestsWrapperResponse(response: requests.models.Response)
75    def __init__(self, response: requests.Response) -> None:
76        """Initialize the wrapper.
77
78        Args:
79            response: The requests.Response object to wrap.
80        """
81        self._response = response

Initialize the wrapper.

Args: response: The requests.Response object to wrap.

status_code: int
83    @property
84    @override
85    def status_code(self) -> int:
86        """The HTTP status code of the response."""
87        return self._response.status_code

The HTTP status code of the response.

content: bytes
89    @property
90    @override
91    def content(self) -> bytes:
92        """The response content as bytes."""
93        return self._response.content

The response content as bytes.

class RequestsWrapper(SsiAgentHttpClient):
 96class RequestsWrapper(SsiAgentHttpClient):
 97    """Wrapper that adapts the requests module to our IssuerAgentHttpClient."""
 98
 99    @override
100    def get(self, url: str, timeout: int) -> SsiAgentHttpResponse:
101        """Send a GET request using the requests library.
102
103        Args:
104            url: The URL to send the request to.
105            timeout: The timeout in seconds.
106
107        Returns:
108            A requests.Response object that implements SsiAgentHttpResponse.
109        """
110        return RequestsWrapperResponse(requests.get(url, timeout=timeout))
111
112    @override
113    def post(
114        self,
115        url: str,
116        data: bytes,
117        headers: dict[str, str],
118        timeout: int,
119    ) -> SsiAgentHttpResponse:
120        """Send a POST request using the requests library.
121
122        Args:
123            url: The URL to send the request to.
124            data: The request body as bytes.
125            headers: The request headers.
126            timeout: The timeout in seconds.
127
128        Returns:
129            A requests.Response object that implements SsiAgentHttpResponse.
130        """
131        return RequestsWrapperResponse(
132            requests.post(url, data=data, headers=headers, timeout=timeout)
133        )

Wrapper that adapts the requests module to our IssuerAgentHttpClient.

@override
def get( self, url: str, timeout: int) -> SsiAgentHttpResponse:
 99    @override
100    def get(self, url: str, timeout: int) -> SsiAgentHttpResponse:
101        """Send a GET request using the requests library.
102
103        Args:
104            url: The URL to send the request to.
105            timeout: The timeout in seconds.
106
107        Returns:
108            A requests.Response object that implements SsiAgentHttpResponse.
109        """
110        return RequestsWrapperResponse(requests.get(url, timeout=timeout))

Send a GET request using the requests library.

Args: url: The URL to send the request to. timeout: The timeout in seconds.

Returns: A requests.Response object that implements SsiAgentHttpResponse.

@override
def post( self, url: str, data: bytes, headers: dict[str, str], timeout: int) -> SsiAgentHttpResponse:
112    @override
113    def post(
114        self,
115        url: str,
116        data: bytes,
117        headers: dict[str, str],
118        timeout: int,
119    ) -> SsiAgentHttpResponse:
120        """Send a POST request using the requests library.
121
122        Args:
123            url: The URL to send the request to.
124            data: The request body as bytes.
125            headers: The request headers.
126            timeout: The timeout in seconds.
127
128        Returns:
129            A requests.Response object that implements SsiAgentHttpResponse.
130        """
131        return RequestsWrapperResponse(
132            requests.post(url, data=data, headers=headers, timeout=timeout)
133        )

Send a POST request using the requests library.

Args: url: The URL to send the request to. data: The request body as bytes. headers: The request headers. timeout: The timeout in seconds.

Returns: A requests.Response object that implements SsiAgentHttpResponse.

class SsiAgentAdapter(src.issuer_agent.issuer_agent_port.IssuerAgentPort):
136class SsiAgentAdapter(IssuerAgentPort):
137    """Proxy adapter for Credential Issuer Metadata."""
138
139    def __init__(
140        self,
141        *,
142        config: ConfigRepoPort,
143        requests_client: SsiAgentHttpClient | None = None,
144    ) -> None:
145        """Initialize the proxy adapter.
146
147        Args:
148            config: The configuration repository.
149            requests_client: The requests client to use.
150        """
151        self.ssi_agent_url: str = config.ssi_agent_url
152        self.ssi_agent_nonce_endpoint: str = config.ssi_agent_nonce_endpoint
153        self.ssi_agent_credential_endpoint: str = config.ssi_agent_credential_endpoint
154        self.requests_client: SsiAgentHttpClient = requests_client or RequestsWrapper()
155        self.timeout: int = 10  # Hardcoded timeout in seconds
156
157    @override
158    def credential_issuer_metadata(self) -> CredentialIssuerMetadata:
159        """Not supported - use SsiAgentMetadataAdapter for metadata."""
160        raise NotImplementedError("Use SsiAgentMetadataAdapter for metadata")
161
162    @override
163    def credential_request(
164        self,
165        format: str,
166        credential_configuration_id: str,
167        proof: dict[str, object],
168        issuer_state: str,
169        access_token: str,
170    ) -> CredentialResponse:
171        """Proxy the credential request to the SSI agent.
172
173        Args:
174            format: The credential format.
175            credential_configuration_id: The credential configuration identifier.
176            proof: The proof object containing proof_type and jwt.
177            issuer_state: The issuer state from the offer.
178            access_token: The access token for authorization.
179
180        Returns:
181            CredentialResponse containing the issued credential(s).
182
183        Raises:
184            IssuerAgentError: When the upstream request fails.
185        """
186        request_body = {
187            "format": format,
188            "credential_configuration_id": credential_configuration_id,
189            "proof": proof,
190            "issuer_state": issuer_state,
191        }
192
193        headers = {
194            "Content-Type": "application/json",
195            "Authorization": f"Bearer {access_token}",
196        }
197
198        # Forward the request to the issuer agent
199        response = self.requests_client.post(
200            self.ssi_agent_credential_endpoint,
201            data=msgspec.json.encode(request_body),
202            headers=headers,
203            timeout=self.timeout,
204        )
205
206        # Handle HTTP errors from upstream
207        if 400 <= int(response.status_code) < 600:
208            raise IssuerAgentError(
209                f"Upstream error: {response.status_code} - {response.content.decode()}"
210            )
211
212        credential_response: CredentialResponse = msgspec.json.decode(
213            response.content, type=CredentialResponse
214        )
215
216        return credential_response
217
218    @override
219    def request_nonce(self) -> dict[str, str]:
220        """Request a nonce from the issuer agent.
221
222        Returns:
223            A dictionary containing the c_nonce.
224
225        Raises:
226            IssuerAgentError: When the upstream request fails.
227        """
228        response = self.requests_client.post(
229            self.ssi_agent_nonce_endpoint,
230            data=b"",
231            headers={},
232            timeout=self.timeout,
233        )
234
235        # Handle HTTP errors from upstream
236        if 400 <= int(response.status_code) < 600:
237            raise IssuerAgentError(
238                f"Upstream error: {response.status_code} - {response.content.decode()}"
239            )
240
241        return msgspec.json.decode(response.content, type=dict[str, str])

Proxy adapter for Credential Issuer Metadata.

SsiAgentAdapter( *, config: src.config.config_port.ConfigRepoPort, requests_client: SsiAgentHttpClient | None = None)
139    def __init__(
140        self,
141        *,
142        config: ConfigRepoPort,
143        requests_client: SsiAgentHttpClient | None = None,
144    ) -> None:
145        """Initialize the proxy adapter.
146
147        Args:
148            config: The configuration repository.
149            requests_client: The requests client to use.
150        """
151        self.ssi_agent_url: str = config.ssi_agent_url
152        self.ssi_agent_nonce_endpoint: str = config.ssi_agent_nonce_endpoint
153        self.ssi_agent_credential_endpoint: str = config.ssi_agent_credential_endpoint
154        self.requests_client: SsiAgentHttpClient = requests_client or RequestsWrapper()
155        self.timeout: int = 10  # Hardcoded timeout in seconds

Initialize the proxy adapter.

Args: config: The configuration repository. requests_client: The requests client to use.

ssi_agent_url: str
ssi_agent_nonce_endpoint: str
ssi_agent_credential_endpoint: str
requests_client: SsiAgentHttpClient
timeout: int
@override
def credential_issuer_metadata(self) -> src.metadata.CredentialIssuerMetadata:
157    @override
158    def credential_issuer_metadata(self) -> CredentialIssuerMetadata:
159        """Not supported - use SsiAgentMetadataAdapter for metadata."""
160        raise NotImplementedError("Use SsiAgentMetadataAdapter for metadata")

Not supported - use SsiAgentMetadataAdapter for metadata.

@override
def credential_request( self, format: str, credential_configuration_id: str, proof: dict[str, object], issuer_state: str, access_token: str) -> src.credentials.credential.CredentialResponse:
162    @override
163    def credential_request(
164        self,
165        format: str,
166        credential_configuration_id: str,
167        proof: dict[str, object],
168        issuer_state: str,
169        access_token: str,
170    ) -> CredentialResponse:
171        """Proxy the credential request to the SSI agent.
172
173        Args:
174            format: The credential format.
175            credential_configuration_id: The credential configuration identifier.
176            proof: The proof object containing proof_type and jwt.
177            issuer_state: The issuer state from the offer.
178            access_token: The access token for authorization.
179
180        Returns:
181            CredentialResponse containing the issued credential(s).
182
183        Raises:
184            IssuerAgentError: When the upstream request fails.
185        """
186        request_body = {
187            "format": format,
188            "credential_configuration_id": credential_configuration_id,
189            "proof": proof,
190            "issuer_state": issuer_state,
191        }
192
193        headers = {
194            "Content-Type": "application/json",
195            "Authorization": f"Bearer {access_token}",
196        }
197
198        # Forward the request to the issuer agent
199        response = self.requests_client.post(
200            self.ssi_agent_credential_endpoint,
201            data=msgspec.json.encode(request_body),
202            headers=headers,
203            timeout=self.timeout,
204        )
205
206        # Handle HTTP errors from upstream
207        if 400 <= int(response.status_code) < 600:
208            raise IssuerAgentError(
209                f"Upstream error: {response.status_code} - {response.content.decode()}"
210            )
211
212        credential_response: CredentialResponse = msgspec.json.decode(
213            response.content, type=CredentialResponse
214        )
215
216        return credential_response

Proxy the credential request to the SSI agent.

Args: format: The credential format. credential_configuration_id: The credential configuration identifier. proof: The proof object containing proof_type and jwt. issuer_state: The issuer state from the offer. access_token: The access token for authorization.

Returns: CredentialResponse containing the issued credential(s).

Raises: IssuerAgentError: When the upstream request fails.

@override
def request_nonce(self) -> dict[str, str]:
218    @override
219    def request_nonce(self) -> dict[str, str]:
220        """Request a nonce from the issuer agent.
221
222        Returns:
223            A dictionary containing the c_nonce.
224
225        Raises:
226            IssuerAgentError: When the upstream request fails.
227        """
228        response = self.requests_client.post(
229            self.ssi_agent_nonce_endpoint,
230            data=b"",
231            headers={},
232            timeout=self.timeout,
233        )
234
235        # Handle HTTP errors from upstream
236        if 400 <= int(response.status_code) < 600:
237            raise IssuerAgentError(
238                f"Upstream error: {response.status_code} - {response.content.decode()}"
239            )
240
241        return msgspec.json.decode(response.content, type=dict[str, str])

Request a nonce from the issuer agent.

Returns: A dictionary containing the c_nonce.

Raises: IssuerAgentError: When the upstream request fails.