Module piate.api.authentication

Expand source code
from abc import abstractmethod
from dataclasses import dataclass
from typing import Dict, NewType
from datetime import datetime, timedelta

import requests
from dataclasses_json import dataclass_json, Undefined

from piate.api.version import APIVersion

RefreshToken = NewType("RefreshToken", str)


@dataclass_json(undefined=Undefined.RAISE)
@dataclass
class ExpirationBase:
    access: datetime
    refresh: datetime

    def access_expired(self) -> bool:
        return self.access >= datetime.now()

    def refresh_expired(self) -> bool:
        return self.refresh >= datetime.now()


class AuthenticationBase:
    PATH = "/oauth2/token"
    TOKEN_GRANT_TYPE = "password"
    EXTENDS_GRANT_TYPE = "refresh_token"
    API_VERSION: APIVersion

    def _generate_expiration(self) -> Dict:
        now = datetime.now()
        expiration = {
            "access": now + timedelta(hours=3),
            "refresh": now + timedelta(hours=12),
        }
        if self.API_VERSION == APIVersion.V2:
            expiration["id"] = now + timedelta(hours=3)
        return expiration

    @abstractmethod
    def token(self, base_url: str, username: str, password: str) -> Dict:
        response = requests.post(
            f"{base_url}{self.PATH}",
            params={
                "grant_type": self.TOKEN_GRANT_TYPE,
                "username": username,
                "password": password,
            },
            headers={
                "Content-Type": "application/x-www-form-urlencoded",
                "Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}",
            },
        ).json()

        response["expiration"] = self._generate_expiration()
        return response

    @abstractmethod
    def extends(self, base_url: str, refresh_token: str) -> Dict:
        response = requests.post(
            f"{base_url}{self.PATH}",
            paarams={
                "grant_type": self.EXTENDS_GRANT_TYPE,
                "refresh_token": refresh_token,
            },
            headers={
                "Content-Type": "application/x-www-form-urlencoded",
                "Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}",
            },
        ).json()

        response["expiration"] = self._generate_expiration()
        return response

Sub-modules

piate.api.authentication.v1
piate.api.authentication.v2

Classes

class AuthenticationBase
Expand source code
class AuthenticationBase:
    PATH = "/oauth2/token"
    TOKEN_GRANT_TYPE = "password"
    EXTENDS_GRANT_TYPE = "refresh_token"
    API_VERSION: APIVersion

    def _generate_expiration(self) -> Dict:
        now = datetime.now()
        expiration = {
            "access": now + timedelta(hours=3),
            "refresh": now + timedelta(hours=12),
        }
        if self.API_VERSION == APIVersion.V2:
            expiration["id"] = now + timedelta(hours=3)
        return expiration

    @abstractmethod
    def token(self, base_url: str, username: str, password: str) -> Dict:
        response = requests.post(
            f"{base_url}{self.PATH}",
            params={
                "grant_type": self.TOKEN_GRANT_TYPE,
                "username": username,
                "password": password,
            },
            headers={
                "Content-Type": "application/x-www-form-urlencoded",
                "Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}",
            },
        ).json()

        response["expiration"] = self._generate_expiration()
        return response

    @abstractmethod
    def extends(self, base_url: str, refresh_token: str) -> Dict:
        response = requests.post(
            f"{base_url}{self.PATH}",
            paarams={
                "grant_type": self.EXTENDS_GRANT_TYPE,
                "refresh_token": refresh_token,
            },
            headers={
                "Content-Type": "application/x-www-form-urlencoded",
                "Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}",
            },
        ).json()

        response["expiration"] = self._generate_expiration()
        return response

Subclasses

Class variables

var API_VERSIONAPIVersion
var EXTENDS_GRANT_TYPE
var PATH
var TOKEN_GRANT_TYPE

Methods

def extends(self, base_url: str, refresh_token: str) ‑> Dict
Expand source code
@abstractmethod
def extends(self, base_url: str, refresh_token: str) -> Dict:
    response = requests.post(
        f"{base_url}{self.PATH}",
        paarams={
            "grant_type": self.EXTENDS_GRANT_TYPE,
            "refresh_token": refresh_token,
        },
        headers={
            "Content-Type": "application/x-www-form-urlencoded",
            "Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}",
        },
    ).json()

    response["expiration"] = self._generate_expiration()
    return response
def token(self, base_url: str, username: str, password: str) ‑> Dict
Expand source code
@abstractmethod
def token(self, base_url: str, username: str, password: str) -> Dict:
    response = requests.post(
        f"{base_url}{self.PATH}",
        params={
            "grant_type": self.TOKEN_GRANT_TYPE,
            "username": username,
            "password": password,
        },
        headers={
            "Content-Type": "application/x-www-form-urlencoded",
            "Accept": f"application/vnd.iate.token+json; version={self.API_VERSION.value}",
        },
    ).json()

    response["expiration"] = self._generate_expiration()
    return response
class ExpirationBase (access: datetime.datetime, refresh: datetime.datetime)

ExpirationBase(access: datetime.datetime, refresh: datetime.datetime)

Expand source code
@dataclass_json(undefined=Undefined.RAISE)
@dataclass
class ExpirationBase:
    access: datetime
    refresh: datetime

    def access_expired(self) -> bool:
        return self.access >= datetime.now()

    def refresh_expired(self) -> bool:
        return self.refresh >= datetime.now()

Subclasses

Class variables

var access : datetime.datetime
var dataclass_json_config
var refresh : datetime.datetime

Static methods

def from_dict(kvs: Union[dict, list, str, int, float, bool, ForwardRef(None)], *, infer_missing=False) ‑> ~A
Expand source code
@classmethod
def from_dict(cls: Type[A],
              kvs: Json,
              *,
              infer_missing=False) -> A:
    return _decode_dataclass(cls, kvs, infer_missing)
def from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) ‑> ~A
Expand source code
@classmethod
def from_json(cls: Type[A],
              s: JsonData,
              *,
              parse_float=None,
              parse_int=None,
              parse_constant=None,
              infer_missing=False,
              **kw) -> A:
    kvs = json.loads(s,
                     parse_float=parse_float,
                     parse_int=parse_int,
                     parse_constant=parse_constant,
                     **kw)
    return cls.from_dict(kvs, infer_missing=infer_missing)
def schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) ‑> dataclasses_json.mm.SchemaF[~A]
Expand source code
@classmethod
def schema(cls: Type[A],
           *,
           infer_missing: bool = False,
           only=None,
           exclude=(),
           many: bool = False,
           context=None,
           load_only=(),
           dump_only=(),
           partial: bool = False,
           unknown=None) -> SchemaType:
    Schema = build_schema(cls, DataClassJsonMixin, infer_missing, partial)

    if unknown is None:
        undefined_parameter_action = _undefined_parameter_action_safe(cls)
        if undefined_parameter_action is not None:
            # We can just make use of the same-named mm keywords
            unknown = undefined_parameter_action.name.lower()

    return Schema(only=only,
                  exclude=exclude,
                  many=many,
                  context=context,
                  load_only=load_only,
                  dump_only=dump_only,
                  partial=partial,
                  unknown=unknown)

Methods

def access_expired(self) ‑> bool
Expand source code
def access_expired(self) -> bool:
    return self.access >= datetime.now()
def refresh_expired(self) ‑> bool
Expand source code
def refresh_expired(self) -> bool:
    return self.refresh >= datetime.now()
def to_dict(self, encode_json=False) ‑> Dict[str, Union[dict, list, str, int, float, bool, ForwardRef(None)]]
Expand source code
def to_dict(self, encode_json=False) -> Dict[str, Json]:
    return _asdict(self, encode_json=encode_json)
def to_json(self, *, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Union[int, str, ForwardRef(None)] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) ‑> str
Expand source code
def to_json(self,
            *,
            skipkeys: bool = False,
            ensure_ascii: bool = True,
            check_circular: bool = True,
            allow_nan: bool = True,
            indent: Optional[Union[int, str]] = None,
            separators: Tuple[str, str] = None,
            default: Callable = None,
            sort_keys: bool = False,
            **kw) -> str:
    return json.dumps(self.to_dict(encode_json=False),
                      cls=_ExtendedEncoder,
                      skipkeys=skipkeys,
                      ensure_ascii=ensure_ascii,
                      check_circular=check_circular,
                      allow_nan=allow_nan,
                      indent=indent,
                      separators=separators,
                      default=default,
                      sort_keys=sort_keys,
                      **kw)