project refactor

This commit is contained in:
Natan Keddem
2024-05-05 16:03:46 -04:00
parent c927ef4c2f
commit 005af9e3da
19 changed files with 535 additions and 327 deletions

View File

@ -1,11 +1,10 @@
import asyncio
from nicegui import ui # type: ignore
from autopve import elements as el
import autopve.logo as logo
from autopve.tabs import Tab
from autopve import logo as logo
from autopve.tabs.settings import Global, Network, Disk
from autopve.tabs.history import History
from autopve.tabs.system import System
from autopve.tabs.system import MustContain, MustNotContain
import logging
logger = logging.getLogger(__name__)
@ -42,7 +41,8 @@ class Content:
self._tab["disk"] = ui.tab(name="Disk").classes("text-secondary")
self._tab["history"] = ui.tab(name="History").classes("text-secondary")
if self._answer != "Default":
self._tab["system"] = ui.tab(name="System").classes("text-secondary")
self._tab["must_contain"] = ui.tab(name="Must Contain").classes("text-secondary")
self._tab["must_not_contain"] = ui.tab(name="Must Not Contain").classes("text-secondary")
with ui.row().classes("items-center"):
self._answer_display = ui.label(self._answer).classes("text-secondary text-h4")
logo.show()
@ -52,7 +52,11 @@ class Content:
async def _tab_changed(self, e):
if e.value == "History":
self._history.update_history()
self._history.update()
elif e.value == "Must Contain":
self._must_contain.update()
elif e.value == "Must Not Contain":
self._must_not_contain.update()
def _build_tab_panels(self):
self._tab_panels.clear()
@ -62,7 +66,8 @@ class Content:
self._disk_content = el.ContentTabPanel(self._tab["disk"])
self._history_content = el.ContentTabPanel(self._tab["history"])
if self._answer != "Default":
self._system_content = el.ContentTabPanel(self._tab["system"])
self._must_contain_content = el.ContentTabPanel(self._tab["must_contain"])
self._must_not_contain_content = el.ContentTabPanel(self._tab["must_not_contain"])
with self._global_content:
self._global = Global(answer=self._answer)
with self._network_content:
@ -72,8 +77,10 @@ class Content:
with self._history_content:
self._history = History(answer=self._answer)
if self._answer != "Default":
with self._system_content:
self._system = System(answer=self._answer)
with self._must_contain_content:
self._must_contain = MustContain(answer=self._answer)
with self._must_not_contain_content:
self._must_not_contain = MustNotContain(answer=self._answer)
async def answer_selected(self, name):
self._answer = name

View File

@ -1,9 +1,8 @@
from typing import Optional
from nicegui.events import KeyEventArguments
from nicegui import ui # type: ignore
from autopve import elements as el
from autopve import storage
from autopve.tabs import Tab
import logging
logger = logging.getLogger(__name__)
@ -88,7 +87,7 @@ class Drawer(object):
save = None
with ui.dialog() as answer_dialog, el.Card():
with el.DBody(height="[95vh]", width="[360px]"):
with el.DBody(height="fit", width="[320px]"):
with el.WColumn():
all_answers = list(storage.answers.keys())
for answer in list(storage.answers.keys()):
@ -106,16 +105,23 @@ class Drawer(object):
return False
return None
def enter_submit(e: KeyEventArguments) -> None:
if e.key == "Enter" and save_ea.no_errors is True:
answer_dialog.submit("save")
answer_input = el.VInput(label="answer", value=" ", invalid_characters="""'`"$\\;&<>|(){}""", invalid_values=all_answers, check=answer_check, max_length=20)
save_ea = el.ErrorAggregator(answer_input)
el.DButton("SAVE", on_click=lambda: answer_dialog.submit("save")).bind_enabled_from(save_ea, "no_errors")
ui.keyboard(on_key=enter_submit, ignore=[])
answer_input.value = name
result = await answer_dialog
if result == "save":
answer = answer_input.value.strip()
if len(answer) > 0 and name != "Default":
storage.answer(answer)
if name in storage.answers:
storage.answers[answer] = storage.answer(name, copy=True)
del storage.answers[name]
for row in self._table.rows:
if name == row["name"]:

View File

@ -1,74 +0,0 @@
from typing import Any, Dict, List, Optional, Union
import asyncio
import copy
import json
import tomlkit
from fastapi import Request
from fastapi.responses import PlainTextResponse
from nicegui import app, Client, ui # type: ignore
from autopve import elements as el
from autopve.drawer import Drawer
from autopve.content import Content
from autopve import storage
import autopve.tabs.history as history
import logging
logger = logging.getLogger(__name__)
def build():
@app.post("/answer")
async def post_answer(request: Request) -> PlainTextResponse:
def response(answer: str, system_info: Dict[str, Any], data: Dict[str, Any]):
toml = tomlkit.dumps(data)
toml_fixed = ""
for line in toml.splitlines():
if len(line) > 0 and line[0] == '"':
line = line.replace('"', "", 2)
toml_fixed = toml_fixed + line + "\n"
r = history.Request(answer=answer, response=toml_fixed, system_info=copy.deepcopy(system_info))
history.History.add_history(r)
for client in Client.instances.values():
if not client.has_socket_connection:
continue
with client:
el.Notification(f"New answer request from {r.name} served by {r.answer}!", type="positive", timeout=30)
return PlainTextResponse(toml_fixed)
system_info = await request.json()
system_info_raw = json.dumps(system_info)
default_data = copy.deepcopy(storage.answer("Default"))
answers = list(storage.answers.keys())
if "Default" in answers:
answers.remove("Default")
for answer in answers:
answer_data = copy.deepcopy(storage.answer(answer))
if "match" in answer_data:
if len(answer_data["match"]) > 0 and answer_data["match"] in system_info_raw:
if "global" in default_data and "global" in answer_data:
default_data["global"].update(answer_data["global"])
if "network" in default_data and "network" in answer_data:
default_data["network"].update(answer_data["network"])
if "disk-setup" in default_data and "disk-setup" in answer_data:
default_data["disk-setup"].update(answer_data["disk-setup"])
return response(answer, system_info, default_data)
return response("Default", system_info, default_data)
@ui.page("/", response_timeout=30)
async def index(client: Client) -> None:
app.add_static_files("/static", "static")
el.load_element_css()
ui.colors(
primary=el.orange,
secondary=el.orange,
accent=el.orange,
dark=el.dark,
positive="#21BA45",
negative="#C10015",
info="#5C8984",
warning="#F2C037",
)
column = ui.column()
content = Content()
drawer = Drawer(column, content.answer_selected, content.hide)
drawer.build()

View File

@ -1,11 +1,10 @@
from typing import Any, Dict, Literal
from typing import Any, Dict
import json
from nicegui import app
import logging
logger = logging.getLogger(__name__)
configs_version = int(102)
configs_version = int(100)
configs_version_string = f"config_{configs_version}"
root = app.storage.general.get(configs_version_string, None)
if root is None:
@ -35,73 +34,10 @@ if "Default" not in answers:
}
def answer(name: str) -> dict:
def answer(name: str, copy: bool = False) -> dict:
if name not in answers:
answers[name] = {}
return answers[name]
# def algo(answer_name: str) -> dict:
# h = answer(answer_name)
# if "algo" not in h:
# h["algo"] = {}
# return h["algo"]
# def algo_sensor(answer_name: str, sensor: str) -> dict:
# a = algo(answer_name)
# if sensor not in a:
# a[sensor] = {}
# if "type" not in a[sensor]:
# a[sensor]["type"] = "curve"
# return a[sensor]
# def curve(answer_name: str, sensor: str) -> dict:
# s = algo_sensor(answer_name, sensor)
# if "curve" not in s:
# s["curve"] = {}
# return s["curve"]
# def curve_speed(answer_name: str, sensor: str, default=None) -> dict:
# c = curve(answer_name, sensor)
# if "speed" not in c:
# if default is None:
# c["speed"] = {
# "Min": None,
# "Low": None,
# "Medium": None,
# "High": None,
# "Max": None,
# }
# else:
# c["speed"] = default
# return c["speed"]
# def curve_temp(answer_name: str, sensor: str, default=None) -> dict:
# c = curve(answer_name, sensor)
# if "temp" not in c:
# if default is None:
# c["temp"] = {
# "Min": 30,
# "Low": 40,
# "Medium": 50,
# "High": 60,
# "Max": 70,
# }
# else:
# c["temp"] = default
# return c["temp"]
# def pid(answer_name: str, sensor: str) -> Dict[str, float]:
# s = algo_sensor(answer_name, sensor)
# if "pid" not in s:
# s["pid"] = {"Kp": 5, "Ki": 0.01, "Kd": 0.1, "Target": 40}
# return s["pid"]
# def pid_coefficient(answer_name: str, sensor: str, coefficient: Literal["Kp", "Ki", "Kd", "Target"]) -> float:
# return pid(answer_name, sensor)[coefficient]
if copy is False:
return answers[name]
else:
return json.loads(json.dumps(answers[name]))

View File

@ -1,104 +1,25 @@
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from nicegui import app, ui # type: ignore
import autopve.elements as el
from autopve import storage
import logging
logger = logging.getLogger(__name__)
@dataclass(kw_only=True)
class Share:
history: List[Dict[str, Any]] = field(default_factory=list)
last_timestamp: float = 0
unique_system_information: List[str] = field(default_factory=list)
class Tab:
def __init__(self, answer: Optional[str] = None, table: Optional[str] = None) -> None:
self.answer: Optional[str] = answer
self.table: Optional[str] = table
self.picked_keys: Dict["str", Any] = {}
_share: Share = Share()
def __init__(self, answer: str, type: Optional[str] = None) -> None:
self.answer: str = answer
self.type: Optional[str] = type
self._elements: Dict[str, Any] = {}
self._build()
def _build(self):
pass
def key_picker(self, keys: Dict[str, Any]):
def keys_controls():
with ui.column() as col:
col.tailwind.width("[560px]").align_items("center")
with ui.card() as card:
card.tailwind.width("full")
key_select = ui.select(list(keys.keys()), label="key", new_value_mode="add-unique", with_input=True)
key_select.tailwind.width("full")
with ui.row() as row:
row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
self.current_help = None
self.current_key = el.FInput(label="key", on_change=lambda e: key_changed(e), read_only=True)
self.current_key.bind_value_from(key_select)
with ui.button(icon="help"):
self.current_help = ui.tooltip("NA")
ui.button(icon="add", on_click=lambda: add_key(self.current_key.value))
ui.separator()
self.keys_scroll = ui.scroll_area()
self.keys_scroll.tailwind.width("full").height("[480px]")
self.key_controls = {}
items = storage.answer(self.answer)
if self.table is not None and self.table in items:
for key, value in items[self.table].items():
if isinstance(value, list):
add_key(key, "[" + ",".join(str(v) for v in value) + "]")
else:
add_key(key, str(value))
def add_key(key, value=""):
if key is not None and key != "" and key not in self.picked_keys:
with self.keys_scroll:
with ui.row() as key_row:
key_row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
self.picked_keys[key] = value
self.key_controls[key] = {
"control": el.FInput(
key,
password=True if key == "root_password" else False,
autocomplete=keys[key]["options"] if key in keys and "options" in keys[key] else None,
on_change=lambda e, key=key: set_key(key, e.value),
),
"row": key_row,
}
self.key_controls[key]["control"].value = value
if key in keys:
with ui.button(icon="help"):
ui.tooltip(keys[key]["description"])
ui.button(icon="remove", on_click=lambda _, key=key: remove_key(key))
def remove_key(key):
self.keys_scroll.remove(self.key_controls[key]["row"])
del self.picked_keys[key]
del self.key_controls[key]
def set_key(key, value: str):
if len(value) > 0:
if key in keys and "type" in keys[key]:
if keys[key]["type"] == "list":
self.picked_keys[key] = value[1:-1].split(",")
elif keys[key]["type"] == "int":
self.picked_keys[key] = int(value)
else:
self.picked_keys[key] = value
else:
if len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]":
self.picked_keys[key] = value[1:-1].split(",")
elif value.isnumeric():
self.picked_keys[key] = int(value)
else:
self.picked_keys[key] = value
storage.answer(self.answer)[self.table] = self.picked_keys
def key_changed(e):
if self.current_help is not None:
if e.value in keys:
self.current_help.text = keys[e.value]["description"]
else:
self.current_help.text = "NA"
keys_controls()

View File

@ -2,9 +2,11 @@ import asyncio
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
import time
import json
import re
from nicegui import app, ui # type: ignore
from . import Tab
import autopve.elements as el
from autopve import elements as el
import logging
logger = logging.getLogger(__name__)
@ -32,7 +34,7 @@ class SelectionConfirm:
self._label = label
self._visible = None
self._request = None
self._submitted = None
self._submitted: Optional[asyncio.Event] = None
with self._container:
self._label = ui.label(self._label).tailwind().text_color("primary")
self._done = el.IButton(icon="done", on_click=lambda: self.submit("confirm"))
@ -66,8 +68,6 @@ class SelectionConfirm:
class History(Tab):
_history: List[Dict[str, Any]] = []
def _build(self):
async def display_request(e):
if e.args["data"]["system_info"] is not None and e.args["data"]["response"] is not None:
@ -137,7 +137,7 @@ class History(Tab):
"maxWidth": 200,
},
],
"rowData": self._history,
"rowData": self._share.history,
},
theme="balham-dark",
)
@ -160,14 +160,27 @@ class History(Tab):
self._grid.options["rowSelection"] = row_selection
self._grid.update()
def update_history(self):
def update(self):
self._grid.update()
@classmethod
def add_history(cls, request: Request) -> None:
if len(cls._history) > 1000:
cls._history.pop(0)
cls._history.append({"timestamp": request.timestamp, "name": request.name, "answer": request.answer, "response": request.response, "system_info": request.system_info})
if len(cls._share.history) > 1000:
cls._share.history.pop(0)
cls._share.history.append(
{
"timestamp": request.timestamp,
"name": request.name,
"answer": request.answer,
"response": request.response,
"system_info": request.system_info,
}
)
cls._share.last_timestamp = request.timestamp
matches = re.findall(r"(\"[^\"]+\"\s*:\s*(\"[^\"]+\"|\d+|true|false))", json.dumps(request.system_info))
for match in matches:
if str(match[0]) not in cls._share.unique_system_information:
cls._share.unique_system_information.append(str(match[0]))
async def _remove_history(self):
self._set_selection(mode="multiple")
@ -175,6 +188,6 @@ class History(Tab):
if request == "confirm":
rows = await self._grid.get_selected_rows()
for row in rows:
self._history.remove(row)
self._share.history.remove(row)
self._grid.update()
self._set_selection()

View File

@ -1,16 +1,110 @@
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from nicegui import app, ui # type: ignore
from typing import Any, Dict, Optional
from nicegui import ui
from . import Tab
import autopve.elements as el
from autopve import elements as el
from autopve import storage
import logging
logger = logging.getLogger(__name__)
class Global(Tab):
def __init__(self, answer: Optional[str] = None) -> None:
self.keys = {
class Setting(Tab):
def __init__(self, answer: str, type: Optional[str] = None, keys: Dict[str, Dict[str, Any]] = {}) -> None:
self.keys: Dict[str, Dict[str, Any]] = keys
super().__init__(answer, type=type)
def _build(self):
self.key_picker()
def key_picker(self):
def keys_controls():
with ui.column() as col:
col.tailwind.width("[560px]").align_items("center")
with ui.card() as card:
card.tailwind.width("full")
key_select = ui.select(list(self.keys.keys()), label="key", new_value_mode="add", with_input=True)
key_select.tailwind.width("full")
with ui.row() as row:
row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
self.help = None
key = el.FInput(label="key", on_change=lambda e: key_changed(e), read_only=True)
key.bind_value_from(key_select)
with ui.button(icon="help"):
self.help = ui.tooltip("NA")
ui.button(icon="add", on_click=lambda key=key: add_key(key.value))
ui.separator()
self._scroll = ui.scroll_area()
self._scroll.tailwind.width("full").height("[480px]")
items = storage.answer(self.answer)
if self.type is not None and self.type in items:
for key, value in items[self.type].items():
if isinstance(value, list):
add_key(key, "[" + ",".join(str(v) for v in value) + "]")
else:
add_key(key, str(value))
def add_key(key: str, value: str = ""):
if key is not None and key != "" and key not in self._elements.keys():
with self._scroll:
with ui.row() as key_row:
key_row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
self._elements[key] = {
"control": el.FInput(
key,
password=True if key == "root_password" else False,
autocomplete=self.keys[key]["options"] if key in self.keys and "options" in self.keys[key] else None,
on_change=lambda e, key=key: set_key(key, e.value),
),
"row": key_row,
}
self._elements[key]["control"].value = value
if key in self.keys:
with ui.button(icon="help"):
ui.tooltip(self.keys[key]["description"])
ui.button(icon="remove", on_click=lambda _, key=key: remove_key(key))
def remove_key(key):
self._scroll.remove(self._elements[key]["row"])
del self._elements[key]
def set_key(key, value: str):
v: Any = None
if len(value) > 0:
if key in self.keys and "type" in self.keys[key]:
if self.keys[key]["type"] == "list":
v = value[1:-1].split(",")
elif self.keys[key]["type"] == "int":
v = int(value)
else:
v = value
else:
if len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]":
v = value[1:-1].split(",")
elif value.isnumeric():
v = int(value)
else:
v = value
if self.type not in storage.answer(self.answer):
storage.answer(self.answer)[self.type] = {}
storage.answer(self.answer)[self.type][key] = v
def key_changed(e):
if self.help is not None:
if e.value in self.keys:
self.help.text = self.keys[e.value]["description"]
else:
self.help.text = "NA"
keys_controls()
class Global(Setting):
def __init__(self, answer: str) -> None:
keys = {
"keyboard": {"description": "The keyboard layout with the following possible options"},
"country": {"description": "The country code in the two letter variant. For example, at, us or fr."},
"fqdn": {"description": "The fully qualified domain name of the host. The domain part will be used as the search domain."},
@ -22,30 +116,24 @@ class Global(Tab):
"description": "If set to true, the installer will reboot automatically when an error is encountered. The default behavior is to wait to give the administrator a chance to investigate why the installation failed."
},
}
super().__init__(answer=answer, table="global")
def _build(self):
self.key_picker(keys=self.keys)
super().__init__(answer, type="global", keys=keys)
class Network(Tab):
def __init__(self, answer: Optional[str] = None) -> None:
self.keys = {
class Network(Setting):
def __init__(self, answer: str) -> None:
keys = {
"source": {"description": "Where to source the static network configuration from. This can be from-dhcp or from-answer."},
"cidr": {"description": "The IP address in CIDR notation. For example, 192.168.1.10/24."},
"dns": {"description": "The IP address of the DNS server."},
"gateway": {"description": "The IP address of the default gateway."},
"filter": {"description": "Filter against the UDEV properties to select the network card. See filters."},
}
super().__init__(answer=answer, table="network")
def _build(self):
self.key_picker(keys=self.keys)
super().__init__(answer, type="network", keys=keys)
class Disk(Tab):
def __init__(self, answer: Optional[str] = None) -> None:
self.keys = {
class Disk(Setting):
def __init__(self, answer: str) -> None:
keys = {
"filesystem": {"description": "One of the following options: ext4, xfs, zfs, or btrfs.", "options": ["ext4", "xfs", "zfs", "btrfs"]},
"disk_list": {"description": 'List of disks to use. Useful if you are sure about the disk names. For example: disk_list = ["sda", "sdb"].'},
"filter": {"description": "Filter against UDEV properties to select the disks for the installation. See filters."},
@ -74,7 +162,4 @@ class Disk(Tab):
},
"btrfs.hdsize": {"description": ""},
}
super().__init__(answer=answer, table="disk-setup")
def _build(self):
self.key_picker(keys=self.keys)
super().__init__(answer, type="disk-setup", keys=keys)

View File

@ -1,8 +1,7 @@
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from nicegui import app, ui # type: ignore
from typing import Optional
from nicegui import ui
from . import Tab
import autopve.elements as el
from autopve import elements as el
from autopve import storage
import logging
@ -10,12 +9,77 @@ logger = logging.getLogger(__name__)
class System(Tab):
def __init__(self, answer=None) -> None:
super().__init__(answer)
def __init__(self, answer: str, type: Optional[str] = None, note: str = "") -> None:
self.note: str = note
self.select: Optional[ui.select] = None
self.last_update_timestamp: float = 0
super().__init__(answer, type=type)
def _build(self):
def set_match(match: str):
storage.answer(self.answer)["match"] = match
self.restriction_picker()
answer = storage.answer(self.answer)
el.FInput("Match String", value=answer["match"] if "match" in answer else "", on_change=lambda e: set_match(e.value))
def restriction_picker(self):
def restriction_controls():
with ui.column() as col:
col.tailwind.width("[560px]").align_items("center")
with ui.card() as card:
card.tailwind.width("full")
self.select = ui.select(self._share.unique_system_information, new_value_mode="add", with_input=True)
self.select.tailwind.width("full")
card.on("mousemove", handler=self.update, throttle=3)
with ui.row() as row:
row.tailwind.width("full").align_items("center").justify_content("between")
restriction = el.FInput(read_only=True)
restriction.tailwind.width("[420px]")
restriction.bind_value_from(self.select)
ui.button(icon="add", on_click=lambda restriction=restriction: add_restriction(restriction.value))
ui.label(self.note).tailwind.align_self("center")
ui.separator()
self.scroll = ui.scroll_area()
self.scroll.tailwind.width("full").height("[480px]")
restrictions = []
if self.type in storage.answer(self.answer):
restrictions = storage.answer(self.answer)[self.type]
for restriction in restrictions:
add_restriction(restriction)
def add_restriction(restriction: str):
if restriction is not None and restriction.strip() != "" and restriction not in self._elements.keys():
with self.scroll:
with ui.row() as row:
row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
self._elements[restriction] = {
"control": el.FInput(value=restriction, read_only=True),
"row": row,
}
self._elements[restriction]["control"].tailwind.width("[420px]")
ui.button(icon="remove", on_click=lambda _, r=restriction: remove_restriction(r))
if self.type not in storage.answer(self.answer):
storage.answer(self.answer)[self.type] = []
if restriction not in storage.answer(self.answer)[self.type]:
storage.answer(self.answer)[self.type].append(restriction)
def remove_restriction(restriction):
self.scroll.remove(self._elements[restriction]["row"])
del self._elements[restriction]
if restriction in storage.answer(self.answer)[self.type]:
storage.answer(self.answer)[self.type].remove(restriction)
restriction_controls()
def update(self):
if self.select is not None and self._share.last_timestamp > self.last_update_timestamp:
self.last_update_timestamp = self._share.last_timestamp
self.select.update()
class MustContain(System):
def __init__(self, answer: str) -> None:
super().__init__(answer, type="must_contain", note="The system information must contain at least one of these strings.")
class MustNotContain(System):
def __init__(self, answer: str) -> None:
super().__init__(answer, type="must_not_contain", note="The system information must not contain any of these strings.")