Dependency that validates the access token and returns the AuthInfo.
Initialize JWKS client with configurable TLS settings.
Source code in bitonicai/internal/authentication/auth_validator.py
| def __init__(self):
"""Initialize JWKS client with configurable TLS settings."""
self.logto_config = logto_config
self.well_known_url = f"{self.logto_config.uri}/oidc/.well-known/openid-configuration"
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self.jwks_client = PyJWKClient(
f"{self.logto_config.uri}/oidc/jwks", ssl_context=ssl_context
)
|
Functions
verify_access_token
async
verify_access_token(credentials)
Validate an incoming bearer token using JWKS.
Parameters:
| Name |
Type |
Description |
Default |
credentials
|
HTTPAuthorizationCredentials
|
Bearer token provided via HTTPAuthorizationCredentials.
|
required
|
Returns:
| Type |
Description |
AuthInfo
|
AuthInfo parsed from the validated JWT.
|
Raises:
| Type |
Description |
HTTPException
|
401 when credentials are missing; 500 on validation errors.
|
Source code in bitonicai/internal/authentication/auth_validator.py
| async def verify_access_token(self, credentials: HTTPAuthorizationCredentials) -> AuthInfo:
"""Validate an incoming bearer token using JWKS.
Args:
credentials: Bearer token provided via HTTPAuthorizationCredentials.
Returns:
AuthInfo parsed from the validated JWT.
Raises:
HTTPException: 401 when credentials are missing; 500 on validation errors.
"""
try:
if credentials is None or not credentials.credentials:
raise HTTPException(status_code=401, detail="No credentials provided")
token = credentials.credentials
payload = self.jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(
token,
payload.key,
algorithms=[payload.algorithm_name],
options={"verify_aud": False},
issuer=self.logto_config.issuer,
)
payload = await self.verify_payload(payload)
return payload
except AuthorizationError as e:
raise HTTPException(status_code=e.status, detail=e.message)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|
verify_payload
async
Verify audience and scopes, returning a structured AuthInfo.
Source code in bitonicai/internal/authentication/auth_validator.py
| async def verify_payload(self, payload: dict) -> AuthInfo:
"""Verify audience and scopes, returning a structured AuthInfo."""
try:
audience = payload.get("aud", [])
scopes = payload.get("scope", [])
if isinstance(audience, str):
audience = [audience]
if isinstance(scopes, str):
scopes = scopes.split(" ") if scopes else []
if not verify_audience(audience):
raise HTTPException(status_code=401, detail="Invalid audience")
return AuthInfo(
sub=payload.get("sub", ""),
client_id=payload.get("client_id", ""),
organization_id=payload.get("organization_id", ""),
scopes=scopes,
audience=audience,
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
except AttributeError:
raise HTTPException(status_code=401, detail="No credentials provided")
|
get_well_known_config
async
Gets the well-known configuration from the Logto server.
Source code in bitonicai/internal/authentication/auth_validator.py
| async def get_well_known_config(self) -> dict:
"""
Gets the well-known configuration from the Logto server.
"""
try:
async with httpx.AsyncClient(verify=False) as client:
response = await client.get(
self.well_known_url, headers={"Accept": "application/json"}
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"HTTP error: {response.status_code} {response.text}",
)
return response.json()
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=e.response.status_code,
detail=f"HTTP error: {e.response.status_code} {e.response.text}",
) from e
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
|
introspect_token
async
Introspects the token and returns the payload.
Source code in bitonicai/internal/authentication/auth_validator.py
| async def introspect_token(self, token: str) -> dict:
"""
Introspects the token and returns the payload.
"""
try:
well_known_config = await self.get_well_known_config()
introspection_url = well_known_config["introspection_endpoint"]
async with httpx.AsyncClient(verify=False) as client:
response = await client.post(
introspection_url,
headers={"Content-Type": "application/x-www-form-urlencoded"},
data={
"token": token,
"client_id": self.logto_config.client_id,
"client_secret": self.logto_config.client_secret,
},
)
return response.json()
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=e.response.status_code,
detail=f"HTTP error: {e.response.status_code} {e.response.text}",
) from e
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|