Skip to content

Auth Validator

AuthValidator

AuthValidator()

Dependency that validates the access token and returns the AuthInfo.

Initialize JWKS client with configurable TLS settings.

Source code in bitonicai/internal/authentication/auth_validator.py
def __init__(self):
    """Initialize JWKS client with configurable TLS settings."""
    self.logto_config = logto_config
    self.well_known_url = f"{self.logto_config.uri}/oidc/.well-known/openid-configuration"

    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE
    self.jwks_client = PyJWKClient(
        f"{self.logto_config.uri}/oidc/jwks", ssl_context=ssl_context
    )

Functions

verify_access_token async
verify_access_token(credentials)

Validate an incoming bearer token using JWKS.

Parameters:

Name Type Description Default
credentials HTTPAuthorizationCredentials

Bearer token provided via HTTPAuthorizationCredentials.

required

Returns:

Type Description
AuthInfo

AuthInfo parsed from the validated JWT.

Raises:

Type Description
HTTPException

401 when credentials are missing; 500 on validation errors.

Source code in bitonicai/internal/authentication/auth_validator.py
async def verify_access_token(self, credentials: HTTPAuthorizationCredentials) -> AuthInfo:
    """Validate an incoming bearer token using JWKS.

    Args:
        credentials: Bearer token provided via HTTPAuthorizationCredentials.

    Returns:
        AuthInfo parsed from the validated JWT.

    Raises:
        HTTPException: 401 when credentials are missing; 500 on validation errors.
    """
    try:
        if credentials is None or not credentials.credentials:
            raise HTTPException(status_code=401, detail="No credentials provided")

        token = credentials.credentials
        payload = self.jwks_client.get_signing_key_from_jwt(token)
        payload = jwt.decode(
            token,
            payload.key,
            algorithms=[payload.algorithm_name],
            options={"verify_aud": False},
            issuer=self.logto_config.issuer,
        )
        payload = await self.verify_payload(payload)
        return payload
    except AuthorizationError as e:
        raise HTTPException(status_code=e.status, detail=e.message)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
verify_payload async
verify_payload(payload)

Verify audience and scopes, returning a structured AuthInfo.

Source code in bitonicai/internal/authentication/auth_validator.py
async def verify_payload(self, payload: dict) -> AuthInfo:
    """Verify audience and scopes, returning a structured AuthInfo."""
    try:
        audience = payload.get("aud", [])
        scopes = payload.get("scope", [])
        if isinstance(audience, str):
            audience = [audience]

        if isinstance(scopes, str):
            scopes = scopes.split(" ") if scopes else []

        if not verify_audience(audience):
            raise HTTPException(status_code=401, detail="Invalid audience")

        return AuthInfo(
            sub=payload.get("sub", ""),
            client_id=payload.get("client_id", ""),
            organization_id=payload.get("organization_id", ""),
            scopes=scopes,
            audience=audience,
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
    except AttributeError:
        raise HTTPException(status_code=401, detail="No credentials provided")
get_well_known_config async
get_well_known_config()

Gets the well-known configuration from the Logto server.

Source code in bitonicai/internal/authentication/auth_validator.py
async def get_well_known_config(self) -> dict:
    """
    Gets the well-known configuration from the Logto server.
    """

    try:
        async with httpx.AsyncClient(verify=False) as client:
            response = await client.get(
                self.well_known_url, headers={"Accept": "application/json"}
            )
            if response.status_code != 200:
                raise HTTPException(
                    status_code=response.status_code,
                    detail=f"HTTP error: {response.status_code} {response.text}",
                )
            return response.json()
    except httpx.HTTPStatusError as e:
        raise HTTPException(
            status_code=e.response.status_code,
            detail=f"HTTP error: {e.response.status_code} {e.response.text}",
        ) from e
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") from e
introspect_token async
introspect_token(token)

Introspects the token and returns the payload.

Source code in bitonicai/internal/authentication/auth_validator.py
async def introspect_token(self, token: str) -> dict:
    """
    Introspects the token and returns the payload.
    """

    try:
        well_known_config = await self.get_well_known_config()
        introspection_url = well_known_config["introspection_endpoint"]
        async with httpx.AsyncClient(verify=False) as client:
            response = await client.post(
                introspection_url,
                headers={"Content-Type": "application/x-www-form-urlencoded"},
                data={
                    "token": token,
                    "client_id": self.logto_config.client_id,
                    "client_secret": self.logto_config.client_secret,
                },
            )
            return response.json()
    except httpx.HTTPStatusError as e:
        raise HTTPException(
            status_code=e.response.status_code,
            detail=f"HTTP error: {e.response.status_code} {e.response.text}",
        ) from e
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")