Module piate.api.authentication
Expand source code
from abc import abstractmethod
from dataclasses import dataclass
from typing import Dict, NewType
from datetime import datetime, timedelta
import requests
from dataclasses_json import dataclass_json, Undefined
from piate.api.version import APIVersion
RefreshToken = NewType("RefreshToken", str)
@dataclass_json(undefined=Undefined.RAISE)
@dataclass
class ExpirationBase:
access: datetime
refresh: datetime
def access_expired(self) -> bool:
return self.access >= datetime.now()
def refresh_expired(self) -> bool:
return self.refresh >= datetime.now()
class AuthenticationBase:
PATH = "/oauth2/token"
TOKEN_GRANT_TYPE = "password"
EXTENDS_GRANT_TYPE = "refresh_token"
API_VERSION: APIVersion
def _generate_expiration(self) -> Dict:
now = datetime.now()
expiration = {
"access": now + timedelta(hours=3),
"refresh": now + timedelta(hours=12),
}
if self.API_VERSION == APIVersion.V2:
expiration["id"] = now + timedelta(hours=3)
return expiration
@abstractmethod
def token(self, base_url: str, username: str, password: str) -> Dict:
response = requests.post(
f"{base_url}{self.PATH}",
params={
"grant_type": self.TOKEN_GRANT_TYPE,
"username": username,
"password": password,
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}",
},
).json()
response["expiration"] = self._generate_expiration()
return response
@abstractmethod
def extends(self, base_url: str, refresh_token: str) -> Dict:
response = requests.post(
f"{base_url}{self.PATH}",
paarams={
"grant_type": self.EXTENDS_GRANT_TYPE,
"refresh_token": refresh_token,
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}",
},
).json()
response["expiration"] = self._generate_expiration()
return response
Sub-modules
piate.api.authentication.v1piate.api.authentication.v2
Classes
class AuthenticationBase-
Expand source code
class AuthenticationBase: PATH = "/oauth2/token" TOKEN_GRANT_TYPE = "password" EXTENDS_GRANT_TYPE = "refresh_token" API_VERSION: APIVersion def _generate_expiration(self) -> Dict: now = datetime.now() expiration = { "access": now + timedelta(hours=3), "refresh": now + timedelta(hours=12), } if self.API_VERSION == APIVersion.V2: expiration["id"] = now + timedelta(hours=3) return expiration @abstractmethod def token(self, base_url: str, username: str, password: str) -> Dict: response = requests.post( f"{base_url}{self.PATH}", params={ "grant_type": self.TOKEN_GRANT_TYPE, "username": username, "password": password, }, headers={ "Content-Type": "application/x-www-form-urlencoded", "Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}", }, ).json() response["expiration"] = self._generate_expiration() return response @abstractmethod def extends(self, base_url: str, refresh_token: str) -> Dict: response = requests.post( f"{base_url}{self.PATH}", paarams={ "grant_type": self.EXTENDS_GRANT_TYPE, "refresh_token": refresh_token, }, headers={ "Content-Type": "application/x-www-form-urlencoded", "Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}", }, ).json() response["expiration"] = self._generate_expiration() return responseSubclasses
Class variables
var API_VERSION : APIVersionvar EXTENDS_GRANT_TYPEvar PATHvar TOKEN_GRANT_TYPE
Methods
def extends(self, base_url: str, refresh_token: str) ‑> Dict-
Expand source code
@abstractmethod def extends(self, base_url: str, refresh_token: str) -> Dict: response = requests.post( f"{base_url}{self.PATH}", paarams={ "grant_type": self.EXTENDS_GRANT_TYPE, "refresh_token": refresh_token, }, headers={ "Content-Type": "application/x-www-form-urlencoded", "Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}", }, ).json() response["expiration"] = self._generate_expiration() return response def token(self, base_url: str, username: str, password: str) ‑> Dict-
Expand source code
@abstractmethod def token(self, base_url: str, username: str, password: str) -> Dict: response = requests.post( f"{base_url}{self.PATH}", params={ "grant_type": self.TOKEN_GRANT_TYPE, "username": username, "password": password, }, headers={ "Content-Type": "application/x-www-form-urlencoded", "Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}", }, ).json() response["expiration"] = self._generate_expiration() return response
class ExpirationBase (access: datetime.datetime, refresh: datetime.datetime)-
ExpirationBase(access: datetime.datetime, refresh: datetime.datetime)
Expand source code
@dataclass_json(undefined=Undefined.RAISE) @dataclass class ExpirationBase: access: datetime refresh: datetime def access_expired(self) -> bool: return self.access >= datetime.now() def refresh_expired(self) -> bool: return self.refresh >= datetime.now()Subclasses
Class variables
var access : datetime.datetimevar dataclass_json_configvar refresh : datetime.datetime
Static methods
def from_dict(kvs: Union[dict, list, str, int, float, bool, ForwardRef(None)], *, infer_missing=False) ‑> ~A-
Expand source code
@classmethod def from_dict(cls: Type[A], kvs: Json, *, infer_missing=False) -> A: return _decode_dataclass(cls, kvs, infer_missing) def from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) ‑> ~A-
Expand source code
@classmethod def from_json(cls: Type[A], s: JsonData, *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) -> A: kvs = json.loads(s, parse_float=parse_float, parse_int=parse_int, parse_constant=parse_constant, **kw) return cls.from_dict(kvs, infer_missing=infer_missing) def schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]-
Expand source code
@classmethod def schema(cls: Type[A], *, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) -> SchemaType: Schema = build_schema(cls, DataClassJsonMixin, infer_missing, partial) if unknown is None: undefined_parameter_action = _undefined_parameter_action_safe(cls) if undefined_parameter_action is not None: # We can just make use of the same-named mm keywords unknown = undefined_parameter_action.name.lower() return Schema(only=only, exclude=exclude, many=many, context=context, load_only=load_only, dump_only=dump_only, partial=partial, unknown=unknown)
Methods
def access_expired(self) ‑> bool-
Expand source code
def access_expired(self) -> bool: return self.access >= datetime.now() def refresh_expired(self) ‑> bool-
Expand source code
def refresh_expired(self) -> bool: return self.refresh >= datetime.now() def to_dict(self, encode_json=False) ‑> Dict[str, Union[dict, list, str, int, float, bool, ForwardRef(None)]]-
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]: return _asdict(self, encode_json=encode_json) def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Union[int, str, ForwardRef(None)] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) ‑> str-
Expand source code
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) -> str: return json.dumps(self.to_dict(encode_json=False), cls=_ExtendedEncoder, skipkeys=skipkeys, ensure_ascii=ensure_ascii, check_circular=check_circular, allow_nan=allow_nan, indent=indent, separators=separators, default=default, sort_keys=sort_keys, **kw)