from typing import Any from json import loads, dumps, decoder class JSONDBArray: def __init__(self, array: list, db): self._array = array self._db = db def all(self) -> list: return [(a[0], a[1]) for a in enumerate(self._array)] def get_by_key(self, key: str, value): results = [] for i, obj in enumerate(self._array): if not isinstance(obj, dict): continue if not key in obj: continue if obj[key] == value: results.append((i, obj)) return results def get_by_occurence_in_array(self, key: str, value) -> list: results = [] for i, obj in enumerate(self._array): if not isinstance(obj, dict): continue if not key in obj: continue if value in obj[key]: results.append((i, obj)) return results def get_by_occurence_in_string(self, keys: list[str], value: str) -> list: results = [] for i, obj in enumerate(self._array): if not isinstance(obj, dict): continue for key in keys: if not key in obj: continue if value.lower() in obj[key].lower(): results.append((i, obj)) break return results def change_by_index(self, index: int, new): self._array[index] = new self._db._write() def add(self, element): self._array.append(element) self._db._write() def remove_by_index(self, index: int): self._array.pop(index) self._db._write() class JSONDBObject: def __init__(self, obj: dict, db): self._obj = obj self._db = db def get(self, key: str) -> JSONDBArray | Any: val = self._obj.get(key) if isinstance(val, list): return JSONDBArray(val, self._db) else: return val def keys(self) -> list: return list(self._obj.keys()) def add(self, key: str, value): self._obj[key] = value self._db._write() def remove(self, key: str): del self._obj[key] self._db._write() class JSONDB: """ JSONDB galvenā klase. """ def __init__(self, filename: str) -> None: self._filename = filename self._db = {} # Create a db file if does not exist f = open(self._filename, "a") f.close() self._read() def _read(self) -> None: f = open(self._filename, "r+", encoding="utf-8") fc = f.read() f.close() try: self._db = loads(fc) except decoder.JSONDecodeError: if len(fc) == 0: self._db = {} else: raise Exception("DB file is not correct") def _write(self) -> None: fc = dumps(self._db) f = open(self._filename, "w", encoding="utf-8") f.write(fc) f.close() def ensure(self, key: str, type: str) -> bool: """ Pārliecinās, ka attiecīgā sadaļa eksistē JSON datubāzē. """ if key in self._db: return True if type == "array": self._db[key] = [] elif type == "object": self._db[key] = {} else: raise Exception("Unknown ensure type") self._write() def get(self, key: str) -> JSONDBObject | JSONDBArray | None: """ Lasa JSON datubāzes sadaļu """ val = self._db.get(key) if val == None: return None if isinstance(val, list): return JSONDBArray(val, self) elif isinstance(val, object): return JSONDBObject(val, self)