notes/jsondb.py

162 lines
3.9 KiB
Python

from typing import Any
from json import loads, dumps, decoder
class JSONDBArray:
def __init__(self, array: list, db):
self._array = array
self._db = db
def all(self) -> list:
return [(a[0], a[1]) for a in enumerate(self._array)]
def get_by_key(self, key: str, value):
results = []
for i, obj in enumerate(self._array):
if not isinstance(obj, dict):
continue
if not key in obj:
continue
if obj[key] == value:
results.append((i, obj))
return results
def get_by_occurence_in_array(self, key: str, value) -> list:
results = []
for i, obj in enumerate(self._array):
if not isinstance(obj, dict):
continue
if not key in obj:
continue
if value in obj[key]:
results.append((i, obj))
return results
def get_by_occurence_in_string(self, keys: list[str], value: str) -> list:
results = []
for i, obj in enumerate(self._array):
if not isinstance(obj, dict):
continue
for key in keys:
if not key in obj:
continue
if value.lower() in obj[key].lower():
results.append((i, obj))
break
return results
def change_by_index(self, index: int, new):
self._array[index] = new
self._db._write()
def add(self, element):
self._array.append(element)
self._db._write()
def remove_by_index(self, index: int):
self._array.pop(index)
self._db._write()
class JSONDBObject:
def __init__(self, obj: dict, db):
self._obj = obj
self._db = db
def get(self, key: str) -> JSONDBArray | Any:
val = self._obj.get(key)
if isinstance(val, list):
return JSONDBArray(val, self._db)
else:
return val
def keys(self) -> list:
return list(self._obj.keys())
def add(self, key: str, value):
self._obj[key] = value
self._db._write()
def remove(self, key: str):
del self._obj[key]
self._db._write()
class JSONDB:
"""
JSONDB galvenā klase.
"""
def __init__(self, filename: str) -> None:
self._filename = filename
self._db = {}
# Create a db file if does not exist
f = open(self._filename, "a")
f.close()
self._read()
def _read(self) -> None:
f = open(self._filename, "r+", encoding="utf-8")
fc = f.read()
f.close()
try:
self._db = loads(fc)
except decoder.JSONDecodeError:
if len(fc) == 0:
self._db = {}
else:
raise Exception("DB file is not correct")
def _write(self) -> None:
fc = dumps(self._db)
f = open(self._filename, "w", encoding="utf-8")
f.write(fc)
f.close()
def ensure(self, key: str, type: str) -> bool:
"""
Pārliecinās, ka attiecīgā sadaļa eksistē JSON datubāzē.
"""
if key in self._db:
return True
if type == "array":
self._db[key] = []
elif type == "object":
self._db[key] = {}
else:
raise Exception("Unknown ensure type")
self._write()
def get(self, key: str) -> JSONDBObject | JSONDBArray | None:
"""
Lasa JSON datubāzes sadaļu
"""
val = self._db.get(key)
if val == None:
return None
if isinstance(val, list):
return JSONDBArray(val, self)
elif isinstance(val, object):
return JSONDBObject(val, self)