import requests from django.conf import settings from collections import Sequence class AuthenticationException (Exception): pass class APIConnection: def __init__(self, servico, recurso, path = ""): self.token = None self.servico = servico self.recurso = recurso self.path = path def _do_get(self, pk, query, headers = {}): if pk: path = str(pk) + "/" + self.path url = settings.SIGS_API_BASE_URL.format(servico = self.servico, recurso = self.recurso, path = path) return requests.get( url, headers= { 'Authorization': "Bearer {}".format(self.token), 'x-api-key': settings.SIGS_API_KEY, **headers, } ) else: path = self.path url = settings.SIGS_API_BASE_URL.format(servico = self.servico, recurso = self.recurso, path = path) return requests.get( url, params = query, headers= { 'Authorization': "Bearer {}".format(self.token), 'x-api-key': settings.SIGS_API_KEY, **headers, } ) def _authenticate(self): form_data = { 'grant_type': 'client_credentials', 'client_id': settings.SIGS_API_CLIENT_ID, 'client_secret': settings.SIGS_API_CLIENT_SECRET, } headers = { 'x-api-key': settings.SIGS_API_KEY, 'Content-Type': 'application/x-www-form-urlencoded', } response = requests.post( settings.SIGS_API_AUTHENTICATION_URL, data = form_data, headers = headers, ) self.token = response.json()['access_token'] def get(self, pk = None, query = {}, headers = {}): if not self.token: self._authenticate() try: r = self._do_get(pk, query, headers) if r.status_code in [401]: raise AuthenticationException() return r except: try: self._authenticate() return self._do_get(pk, query, headers) except: raise ConnectionRefusedError() class BasicFilter: def __init__(self, key, value = None): self.keys = key.split('__') self.value = value def __call__(self, object) -> bool: for key in self.keys: object = getattr(object, key) if self.value == None: return key in object else: return object == self.value def __str__(self) -> str: return '{keys} == {value}'.format(keys = self.keys, value = self.value) class SIGsAPIQuerySet: def __init__(self, the_class, api_filters = {}, basic_filters = [], limit = 100, _results = [], _last_response = None, _last_results = [], api_query_fields_list = [], ): self.the_class = the_class if not self.the_class._meta.path: self.connection = APIConnection( servico = the_class._meta.servico, recurso = the_class._meta.recurso, ) else: self.connection = APIConnection( servico = the_class._meta.servico, recurso = the_class._meta.recurso, path = the_class._meta.path, ) self.api_filters = api_filters self.basic_filters = basic_filters self.limit = limit self._results = _results self._last_response = _last_response self._last_results = _last_results self.api_query_fields_list = api_query_fields_list or self.the_class._meta.api_query_fields_list or [] self.stop_iteration_flag = False self.offset = 0 self._prefetch_related_lookups = False self.pk = None def copy(self): return SIGsAPIQuerySet( the_class=self.the_class, api_filters={**self.api_filters}, basic_filters=[ x for x in self.basic_filters], limit = self.limit, _results = [ x for x in self._results ], _last_response = self._last_response, _last_results = [ x for x in self._last_results], api_query_fields_list= [x for x in self.api_query_fields_list] ) def all(self): return self # Faça nada... def get(self, pk = None, **kwargs): if pk: #Execute imediatamente self.pk = pk if self.pk: response = self.connection.get(pk = self.pk, query = {}) object = response.json() try: return self.the_class(object[0]) except KeyError: return self.the_class(object) elif kwargs: # realize um filtro e pegue o único resultado return self.filter(**kwargs).get() else: self._restart() self._fetch_all() if len(self._results)>1: raise self.the_class.MultipleObjectsReturned() elif len(self._results)<1: raise self.the_class.DoesNotExist() else: return self._results[0] def filter(self, **kwargs): new = self.copy() for key, value in kwargs.items(): if key in self.the_class._meta.pk_fields: new.pk = value elif len(self.api_query_fields_list) == 0 or key in self.api_query_fields_list: key = key.replace('_','-') new.api_filters[key] = value else: new.basic_filters.append(BasicFilter(key, value)) return new def _restart(self): self.offset = 0 self._results = [] def _single_fetch(self): response = self.connection.get(query = { 'offset': self.offset, 'limit': self.limit, **self.api_filters, }) self._last_response = response def _save_results(self): if not self._last_response: return objects = self._last_response.json() #if len(objects) == 0: # self.stop_iteration_flag = True objects = [ self.the_class(o) for o in objects ] objects = [ o for o in objects if self._filter(o)] self._last_results = objects self._results = self._results + objects # self._last_response = None def _next(self): self._single_fetch() self.offset += self.limit if len(self._last_response.json()) == 0: return False self._save_results() return True def _filter(self, o): return all( [ condition(o) for condition in self.basic_filters ]) def count(self): return self.__len__() def _len_api(self): response = self.connection.get(query = { 'offset': self.offset, 'limit': self.limit, **self.api_filters, }, headers = { 'paginado': 'true', }) return int(float(response.headers['X-Total'])) def _getitem_api(self, i): response = self.connection.get(query = { 'offset': i , 'limit': 1, **self.api_filters, }) return response.json()[0] # Métodos especiais do Python def __getitem__(self, i): if isinstance(i, slice): self._fetch_all() return self._results[i] elif i < len(self._results): return self._results[i] elif ((i - len(self._results)) > 100) and ( len(self.basic_filters) == 0): return self.the_class(self._getitem_api(i)) else: while i >= len(self._results): self._next() return self._results[i] def __len__(self): if len(self.basic_filters) == 0: return self._len_api() else: self.fetch_all() return len(self._results) def __iter__(self): self._restart() while self._next(): for r in self._last_results: yield r def _fetch_all(self): self._restart() while self._next(): pass def iterator(self): self._fetch_all() for r in self._results: yield r @property def model(self): return self.the_class class SIGsAPIModelManager: def __init__(self, the_class): self.queryset = SIGsAPIQuerySet(the_class=the_class) def all(self): return self.queryset.all() def get(self, *args, **kwargs): return self.queryset.get(*args, **kwargs) def filter(self, *args, **kwargs): return self.queryset.filter(*args, **kwargs) class SIGsAPIRelationModelManager(SIGsAPIModelManager): def __init__(self, the_class, filters): super().__init__(the_class = the_class) self.queryset = self.queryset.filter(**filters)