This commit is contained in:
Natan Keddem
2024-04-30 19:45:13 -04:00
commit 5e33556dcf
26 changed files with 2023 additions and 0 deletions

0
autopve/__init__.py Normal file
View File

91
autopve/content.py Normal file
View File

@ -0,0 +1,91 @@
import asyncio
from nicegui import ui # type: ignore
from autopve import elements as el
import autopve.logo as logo
from autopve.tabs import Tab
from autopve.tabs.settings import Global, Network, Disk
from autopve.tabs.history import History
from autopve.tabs.system import System
import logging
logger = logging.getLogger(__name__)
class Content:
def __init__(self) -> None:
self._header = None
self._tabs = None
self._tab = {}
self._spinner = None
self._answer = None
self._tab_panels = None
self._grid = None
self._tab_panel = {}
self._answer = None
self._tasks = []
self._manage = None
self._automation = None
self._history = None
self._header = ui.header(bordered=True).classes("bg-dark q-pt-sm q-pb-xs")
self._header.tailwind.border_color(f"[{el.orange}]").min_width("[920px]")
self._header.visible = False
self._content = el.WColumn()
self._content.bind_visibility_from(self._header)
def _build(self):
with self._header:
with ui.row().classes("w-full h-16 justify-between items-center"):
self._tabs = ui.tabs()
with self._tabs:
self._tab["global"] = ui.tab(name="Global").classes("text-secondary")
self._tab["network"] = ui.tab(name="Network").classes("text-secondary")
self._tab["disk"] = ui.tab(name="Disk").classes("text-secondary")
self._tab["history"] = ui.tab(name="History").classes("text-secondary")
if self._answer != "Default":
self._tab["system"] = ui.tab(name="System").classes("text-secondary")
with ui.row().classes("items-center"):
self._answer_display = ui.label(self._answer).classes("text-secondary text-h4")
logo.show()
with self._content:
self._tab_panels = ui.tab_panels(self._tabs, value="Global", on_change=lambda e: self._tab_changed(e), animated=False)
self._tab_panels.tailwind.width("full")
async def _tab_changed(self, e):
if e.value == "History":
self._history.update_history()
def _build_tab_panels(self):
self._tab_panels.clear()
with self._tab_panels:
self._global_content = el.ContentTabPanel(self._tab["global"])
self._network_content = el.ContentTabPanel(self._tab["network"])
self._disk_content = el.ContentTabPanel(self._tab["disk"])
self._history_content = el.ContentTabPanel(self._tab["history"])
if self._answer != "Default":
self._system_content = el.ContentTabPanel(self._tab["system"])
with self._global_content:
self._global = Global(answer=self._answer)
with self._network_content:
self._network = Network(answer=self._answer)
with self._disk_content:
self._disk = Disk(answer=self._answer)
with self._history_content:
self._history = History(answer=self._answer)
if self._answer != "Default":
with self._system_content:
self._system = System(answer=self._answer)
async def answer_selected(self, name):
self._answer = name
self.hide()
self._header.clear()
self._content.clear()
self._build()
self._build_tab_panels()
self._header.visible = True
def hide(self):
if self._header is not None:
self._header.visible = False
if self._tab_panels is not None:
self._tab_panels.clear()

165
autopve/drawer.py Normal file
View File

@ -0,0 +1,165 @@
from typing import Optional
from nicegui import ui # type: ignore
from autopve import elements as el
from autopve import storage
from autopve.tabs import Tab
import logging
logger = logging.getLogger(__name__)
class Drawer(object):
def __init__(self, main_column, on_click, hide_content) -> None:
self._on_click = on_click
self._hide_content = hide_content
self._main_column = main_column
self._header_row = None
self._table = None
self._name = ""
self._answername = ""
self._username = ""
self._password = ""
self._buttons = {}
self._selection_mode = None
def build(self):
def toggle_drawer():
if chevron._props["icon"] == "chevron_left":
content.visible = False
drawer.props("width=0")
chevron.props("icon=chevron_right")
chevron.style("top: 16vh").style("right: -24px").style("height: 16vh")
else:
content.visible = True
drawer.props("width=200")
chevron.props("icon=chevron_left")
chevron.style("top: 16vh").style("right: -12px").style("height: 16vh")
with ui.left_drawer(top_corner=True).props("width=226 behavior=desktop bordered").classes("q-pa-none") as drawer:
with ui.column().classes("h-full w-full q-py-xs q-px-md") as content:
self._header_row = el.WRow().classes("justify-between")
self._header_row.tailwind().height("12")
with self._header_row:
with ui.row():
el.IButton(icon="add", on_click=self._display_answer_dialog)
self._buttons["remove"] = el.IButton(icon="remove", on_click=lambda: self._modify_answer("remove"))
self._buttons["edit"] = el.IButton(icon="edit", on_click=lambda: self._modify_answer("edit"))
ui.label(text="ANSWERS").classes("text-secondary")
self._table = (
ui.table(
[
{
"name": "name",
"label": "Name",
"field": "name",
"required": True,
"align": "center",
"sortable": True,
}
],
[],
row_key="name",
pagination={"rowsPerPage": 0, "sortBy": "name"},
on_select=lambda e: self._selected(e),
)
.on("rowClick", self._clicked, [[], ["name"], None])
.props("dense flat bordered binary-state-sort hide-header hide-pagination hide-selected-bannerhide-no-data")
)
self._table.tailwind.width("full")
self._table.visible = False
for name in storage.answers.keys():
self._add_answer_to_table(name)
chevron = ui.button(icon="chevron_left", color=None, on_click=toggle_drawer).props("padding=0px")
chevron.classes("absolute")
chevron.style("top: 16vh").style("right: -12px").style("background-color: #0E1210 !important").style("height: 16vh")
chevron.tailwind.border_color("[#E97451]")
chevron.props(f"color=primary text-color=accent")
def _add_answer_to_table(self, name):
if len(name) > 0:
for row in self._table.rows:
if name == row["name"]:
return
self._table.add_rows({"name": name})
self._table.visible = True
async def _display_answer_dialog(self, name=""):
save = None
with ui.dialog() as answer_dialog, el.Card():
with el.DBody(height="[95vh]", width="[360px]"):
with el.WColumn():
all_answers = list(storage.answers.keys())
for answer in list(storage.answers.keys()):
all_answers.append(answer.replace(" ", ""))
if name != "":
if name in all_answers:
all_answers.remove(name)
if name.replace(" ", "") in all_answers:
all_answers.remove(name.replace(" ", ""))
def answer_check(value: str) -> Optional[bool]:
spaceless = value.replace(" ", "")
for invalid_value in all_answers:
if invalid_value == spaceless:
return False
return None
answer_input = el.VInput(label="answer", value=" ", invalid_characters="""'`"$\\;&<>|(){}""", invalid_values=all_answers, check=answer_check, max_length=20)
save_ea = el.ErrorAggregator(answer_input)
el.DButton("SAVE", on_click=lambda: answer_dialog.submit("save")).bind_enabled_from(save_ea, "no_errors")
answer_input.value = name
result = await answer_dialog
if result == "save":
answer = answer_input.value.strip()
if len(answer) > 0 and name != "Default":
if name in storage.answers:
del storage.answers[name]
for row in self._table.rows:
if name == row["name"]:
self._table.remove_rows(row)
self._add_answer_to_table(answer)
def _modify_answer(self, mode):
self._hide_content()
self._selection_mode = mode
if mode is None:
self._table._props["selected"] = []
self._table.props("selection=none")
for icon, button in self._buttons.items():
button.props(f"icon={icon}")
elif self._buttons[mode]._props["icon"] == "close":
self._selection_mode = None
self._table._props["selected"] = []
self._table.props("selection=none")
for icon, button in self._buttons.items():
button.props(f"icon={icon}")
else:
self._table.props("selection=single")
for icon, button in self._buttons.items():
if mode == icon:
button.props("icon=close")
else:
button.props(f"icon={icon}")
async def _selected(self, e):
self._hide_content()
if self._selection_mode == "edit":
if len(e.selection) > 0 and e.selection[0]["name"] != "Default":
await self._display_answer_dialog(name=e.selection[0]["name"])
if self._selection_mode == "remove":
if len(e.selection) > 0:
for row in e.selection:
if row["name"] != "Default":
if row["name"] in storage.answers:
del storage.answers[row["name"]]
self._table.remove_rows(row)
self._modify_answer(None)
async def _clicked(self, e):
if "name" in e.args[1]:
answer = e.args[1]["name"]
if self._on_click is not None:
await self._on_click(answer)

374
autopve/elements.py Normal file
View File

@ -0,0 +1,374 @@
from typing import Any, Callable, Dict, List, Literal, Optional, Union
from nicegui import ui, app, Tailwind
from nicegui.elements.notification import NotificationPosition, NotificationType # type: ignore
from nicegui.elements.spinner import SpinnerTypes # type: ignore
from nicegui.elements.tabs import Tab # type: ignore
from nicegui.tailwind_types.height import Height # type: ignore
from nicegui.tailwind_types.width import Width # type: ignore
from nicegui.elements.mixins.validation_element import ValidationElement # type: ignore
from nicegui.events import GenericEventArguments, handle_event # type: ignore
import logging
logger = logging.getLogger(__name__)
orange = "#E97451"
dark = "#0E1210"
def load_element_css():
ui.add_head_html(
f"""
<style>
.autopve-colors,
.q-table--dark,
.q-table--dark .q-table__bottom,
.q-table--dark td,
.q-table--dark th,
.q-table--dark thead,
.q-table--dark tr,
.q-table__card--dark,
body.body--dark .q-drawer,
body.body--dark .q-footer,
body.body--dark .q-header {{
color: {orange} !important;
border-color: {orange} !important;
}}
.full-size-stepper,
.full-size-stepper .q-stepper__content,
.full-size-stepper .q-stepper__step-content,
.full-size-stepper .q-stepper__step-inner {{
height: 100%;
width: 100%;
display: flex;
flex-direction: column;
align-items: center;
}}
.multi-line-notification {{
white-space: pre-line;
}}
.q-drawer--bordered{{
border-color: {orange} !important;
}}
</style>
"""
)
ui.add_head_html('<link href="static/jse-theme-dark.css" rel="stylesheet">')
class ErrorAggregator:
def __init__(self, *elements: ValidationElement) -> None:
self.elements: list[ValidationElement] = list(elements)
self.enable: bool = True
def clear(self):
self.elements.clear()
def append(self, element: ValidationElement):
self.elements.append(element)
def remove(self, element: ValidationElement):
self.elements.remove(element)
@property
def no_errors(self) -> bool:
if len(self.elements) > 0:
validators = all(validation(element.value) for element in self.elements for validation in element.validation.values())
return self.enable and validators
else:
return True
class WColumn(ui.column):
def __init__(self) -> None:
super().__init__()
self.tailwind.width("full").align_items("center")
class DBody(ui.column):
def __init__(self, height: Height = "[480px]", width: Width = "[240px]") -> None:
super().__init__()
self.tailwind.align_items("center").justify_content("between")
self.tailwind.height(height).width(width)
class WRow(ui.row):
def __init__(self) -> None:
super().__init__()
self.tailwind.width("full").align_items("center").justify_content("center")
class Card(ui.card):
def __init__(self) -> None:
super().__init__()
self.tailwind.border_color(f"[{orange}]")
class DInput(ui.input):
def __init__(
self,
label: str | None = None,
*,
placeholder: str | None = None,
value: str = " ",
password: bool = False,
password_toggle_button: bool = False,
on_change: Callable[..., Any] | None = None,
autocomplete: List[str] | None = None,
validation: Callable[..., Any] = bool,
) -> None:
super().__init__(
label,
placeholder=placeholder,
value=value,
password=password,
password_toggle_button=password_toggle_button,
on_change=on_change,
autocomplete=autocomplete,
validation={"": validation},
)
self.tailwind.width("full")
if value == " ":
self.value = ""
class VInput(ui.input):
def __init__(
self,
label: str | None = None,
*,
placeholder: str | None = None,
value: str = " ",
password: bool = False,
password_toggle_button: bool = False,
on_change: Callable[..., Any] | None = None,
autocomplete: List[str] | None = None,
invalid_characters: str = "",
invalid_values: List[str] = [],
max_length: int = 64,
check: Callable[..., Any] | None = None,
) -> None:
def checks(value: str) -> bool:
if value is None or value == "" or len(value) > max_length:
return False
for invalid_character in invalid_characters:
if invalid_character in value:
return False
for invalid_value in invalid_values:
if invalid_value == value:
return False
if check is not None:
check_status = check(value)
if check_status is not None:
return check_status
return True
super().__init__(
label,
placeholder=placeholder,
value=value,
password=password,
password_toggle_button=password_toggle_button,
on_change=on_change,
autocomplete=autocomplete,
validation={"": lambda value: checks(value)},
)
self.tailwind.width("full")
if value == " ":
self.value = ""
class FInput(ui.input):
def __init__(
self,
label: str | None = None,
*,
placeholder: str | None = None,
value: str = " ",
password: bool = False,
password_toggle_button: bool = False,
on_change: Callable[..., Any] | None = None,
autocomplete: List[str] | None = None,
validation: Callable[..., Any] = bool,
read_only: bool = False,
) -> None:
super().__init__(
label,
placeholder=placeholder,
value=value,
password=password,
password_toggle_button=password_toggle_button,
on_change=on_change,
autocomplete=autocomplete,
validation={} if read_only else {"": validation},
)
self.tailwind.width("[320px]")
if value == " ":
self.value = ""
if read_only:
self.props("readonly")
class DSelect(ui.select):
def __init__(
self,
options: Union[List, Dict],
*,
label: Optional[str] = None,
value: Any = None,
on_change: Optional[Callable[..., Any]] = None,
with_input: bool = False,
new_value_mode: Optional[Literal["add", "add-unique", "toggle"]] = None,
multiple: bool = False,
clearable: bool = False,
) -> None:
super().__init__(
options,
label=label,
value=value,
on_change=on_change,
with_input=with_input,
new_value_mode=new_value_mode,
multiple=multiple,
clearable=clearable,
)
self.tailwind.width("full")
if multiple is True:
self.props("use-chips")
class Select(ui.select):
def __init__(
self,
options: Union[List, Dict],
*,
label: Optional[str] = None,
value: Any = None,
on_change: Optional[Callable[..., Any]] = None,
with_input: bool = False,
new_value_mode: Optional[Literal["add", "add-unique", "toggle"]] = None,
multiple: bool = False,
clearable: bool = False,
) -> None:
super().__init__(
options,
label=label,
value=value,
on_change=on_change,
with_input=with_input,
new_value_mode=new_value_mode,
multiple=multiple,
clearable=clearable,
)
self.tailwind.width("1/2")
class DButton(ui.button):
def __init__(
self,
text: str = "",
*,
on_click: Callable[..., Any] | None = None,
color: Optional[str] = "primary",
icon: str | None = None,
) -> None:
super().__init__(text, on_click=on_click, color=color, icon=icon)
self.props("size=md")
self.tailwind.padding("px-2.5").padding("py-1")
class DCheckbox(ui.checkbox):
def __init__(self, text: str = "", *, value: bool = False, on_change: Callable[..., Any] | None = None) -> None:
super().__init__(text, value=value, on_change=on_change)
self.tailwind.text_color("secondary")
class IButton(ui.button):
def __init__(
self,
text: str = "",
*,
on_click: Callable[..., Any] | None = None,
color: Optional[str] = "primary",
icon: str | None = None,
) -> None:
super().__init__(text, on_click=on_click, color=color, icon=icon)
self.props("size=sm")
class SmButton(ui.button):
def __init__(
self,
text: str = "",
*,
on_click: Callable[..., Any] | None = None,
color: Optional[str] = "primary",
icon: str | None = None,
) -> None:
super().__init__(text, on_click=on_click, color=color, icon=icon)
self.props("size=sm")
self.tailwind.width("16")
class LgButton(ui.button):
def __init__(
self,
text: str = "",
*,
on_click: Callable[..., Any] | None = None,
color: Optional[str] = "primary",
icon: str | None = None,
) -> None:
super().__init__(text, on_click=on_click, color=color, icon=icon)
self.props("size=md")
class Notification(ui.notification):
def __init__(
self,
message: Any = "",
*,
position: NotificationPosition = "bottom",
close_button: Union[bool, str] = False,
type: NotificationType = None, # pylint: disable=redefined-builtin
color: Optional[str] = None,
multi_line: bool = False,
icon: Optional[str] = None,
spinner: bool = False,
timeout: Optional[float] = 5.0,
**kwargs: Any,
) -> None:
if multi_line:
super().__init__(message, position=position, close_button=True, type=type, color=color, multi_line=True, icon=icon, spinner=spinner, timeout=60)
else:
super().__init__(message, position=position, type=type, spinner=spinner, timeout=timeout)
class ContentTabPanel(ui.tab_panel):
def __init__(self, name: Tab | str) -> None:
super().__init__(name)
self.style("height: calc(100vh - 150px)")
self.tailwind.min_width("[920px]")
class FSelect(ui.select):
def __init__(
self,
options: Union[List, Dict],
*,
label: Optional[str] = None,
value: Any = None,
on_change: Optional[Callable[..., Any]] = None,
with_input: bool = False,
new_value_mode: Optional[Literal["add", "add-unique", "toggle"]] = None,
multiple: bool = False,
clearable: bool = False,
) -> None:
super().__init__(options, label=label, value=value, on_change=on_change, with_input=with_input, new_value_mode=new_value_mode, multiple=multiple, clearable=clearable)
self.tailwind.width("64")
class JsonEditor(ui.json_editor):
def __init__(self, properties: Dict, *, on_select: Optional[Callable] = None, on_change: Optional[Callable] = None) -> None:
super().__init__(properties, on_select=on_select, on_change=on_change)
self.classes("jse-theme-dark")
self.tailwind.height("[640px]").width("[640px]")

19
autopve/logo.py Normal file
View File

@ -0,0 +1,19 @@
from nicegui import ui # type: ignore
import logging
logger = logging.getLogger(__name__)
logo = """
<svg width="50px" height="50px" viewBox="0 0 48 48" fill="none" xmlns="http://www.w3.org/2000/svg">
<circle cx="24" cy="24" r="9" fill="#666666" stroke="#E97451" stroke-width="4"/>
<circle r="3" transform="matrix(-1 0 0 1 24 24)" fill="white"/>
<path d="M9 14C9 14 16.5 2.49997 29.5 6.99998C42.5 11.5 42 24.5 42 24.5" stroke="#E97451" stroke-width="4" stroke-linecap="round" stroke-linejoin="round"/>
<path d="M39 34C39 34 33 45 19.5 41.5C6 38 6 24 6 24" stroke="#E97451" stroke-width="4" stroke-linecap="round" stroke-linejoin="round"/>
<path d="M42 8V24" stroke="#E97451" stroke-width="4" stroke-linecap="round" stroke-linejoin="round"/>
<path d="M6 24L6 40" stroke="#E97451" stroke-width="4" stroke-linecap="round" stroke-linejoin="round"/>
</svg>
"""
def show():
ui.html(logo)

74
autopve/page.py Normal file
View File

@ -0,0 +1,74 @@
from typing import Any, Dict, List, Optional, Union
import asyncio
import copy
import json
import tomlkit
from fastapi import Request
from fastapi.responses import PlainTextResponse
from nicegui import app, Client, ui # type: ignore
from autopve import elements as el
from autopve.drawer import Drawer
from autopve.content import Content
from autopve import storage
import autopve.tabs.history as history
import logging
logger = logging.getLogger(__name__)
def build():
@app.post("/answer")
async def post_answer(request: Request) -> PlainTextResponse:
def response(answer: str, system_info: Dict[str, Any], data: Dict[str, Any]):
toml = tomlkit.dumps(default_data)
toml_fixed = ""
for line in toml.splitlines():
if len(line) > 0 and line[0] == '"':
line = line.replace('"', "", 2)
toml_fixed = toml_fixed + line + "\n"
r = history.Request(answer=answer, response=toml_fixed, system_info=dict(system_info))
history.History.add_history(r)
for client in Client.instances.values():
if not client.has_socket_connection:
continue
with client:
el.Notification(f"New answer request from {r.name} served by {r.answer}!", type="positive", timeout=30)
return PlainTextResponse(toml_fixed)
system_info = await request.json()
system_info_raw = json.dumps(system_info)
default_data = dict(storage.answer("Default"))
answers = list(storage.answers.keys())
if "Default" in answers:
answers.remove("Default")
for answer in answers:
answer_data = dict(storage.answer(answer))
if "match" in answer_data:
if len(answer_data["match"]) > 0 and answer_data["match"] in system_info_raw:
if "global" in default_data and "global" in answer_data:
default_data["global"].update(answer_data["global"])
if "network" in default_data and "network" in answer_data:
default_data["network"].update(answer_data["network"])
if "disk-setup" in default_data and "disk-setup" in answer_data:
default_data["disk-setup"].update(answer_data["disk-setup"])
return response(answer, system_info, default_data)
return response("Default", system_info, default_data)
@ui.page("/", response_timeout=30)
async def index(client: Client) -> None:
app.add_static_files("/static", "static")
el.load_element_css()
ui.colors(
primary=el.orange,
secondary=el.orange,
accent=el.orange,
dark=el.dark,
positive="#21BA45",
negative="#C10015",
info="#5C8984",
warning="#F2C037",
)
column = ui.column()
content = Content()
drawer = Drawer(column, content.answer_selected, content.hide)
drawer.build()

107
autopve/storage.py Normal file
View File

@ -0,0 +1,107 @@
from typing import Any, Dict, Literal
from nicegui import app
import logging
logger = logging.getLogger(__name__)
configs_version = int(102)
configs_version_string = f"config_{configs_version}"
root = app.storage.general.get(configs_version_string, None)
if root is None:
logger.warning(f"Storage version not found, updating version to {configs_version}.")
logger.warning(f"Connections cleared, repeat setup procedure.")
app.storage.general[configs_version_string] = {}
answers: Dict[str, Any] = app.storage.general[configs_version_string]
if "Default" not in answers:
answers["Default"] = {
"global": {
"keyboard": "de",
"country": "at",
"fqdn": "pveauto.testinstall",
"mailto": "mail@no.invalid",
"timezone": "Europe/Vienna",
"root_password": "123456",
},
"network": {
"source": "from-dhcp",
},
"disk-setup": {
"filesystem": "zfs",
"zfs.raid": "raid1",
"disk_list": ["sda", "sdb"],
},
}
def answer(name: str) -> dict:
if name not in answers:
answers[name] = {}
return answers[name]
# def algo(answer_name: str) -> dict:
# h = answer(answer_name)
# if "algo" not in h:
# h["algo"] = {}
# return h["algo"]
# def algo_sensor(answer_name: str, sensor: str) -> dict:
# a = algo(answer_name)
# if sensor not in a:
# a[sensor] = {}
# if "type" not in a[sensor]:
# a[sensor]["type"] = "curve"
# return a[sensor]
# def curve(answer_name: str, sensor: str) -> dict:
# s = algo_sensor(answer_name, sensor)
# if "curve" not in s:
# s["curve"] = {}
# return s["curve"]
# def curve_speed(answer_name: str, sensor: str, default=None) -> dict:
# c = curve(answer_name, sensor)
# if "speed" not in c:
# if default is None:
# c["speed"] = {
# "Min": None,
# "Low": None,
# "Medium": None,
# "High": None,
# "Max": None,
# }
# else:
# c["speed"] = default
# return c["speed"]
# def curve_temp(answer_name: str, sensor: str, default=None) -> dict:
# c = curve(answer_name, sensor)
# if "temp" not in c:
# if default is None:
# c["temp"] = {
# "Min": 30,
# "Low": 40,
# "Medium": 50,
# "High": 60,
# "Max": 70,
# }
# else:
# c["temp"] = default
# return c["temp"]
# def pid(answer_name: str, sensor: str) -> Dict[str, float]:
# s = algo_sensor(answer_name, sensor)
# if "pid" not in s:
# s["pid"] = {"Kp": 5, "Ki": 0.01, "Kd": 0.1, "Target": 40}
# return s["pid"]
# def pid_coefficient(answer_name: str, sensor: str, coefficient: Literal["Kp", "Ki", "Kd", "Target"]) -> float:
# return pid(answer_name, sensor)[coefficient]

104
autopve/tabs/__init__.py Normal file
View File

@ -0,0 +1,104 @@
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from nicegui import app, ui # type: ignore
import autopve.elements as el
from autopve import storage
import logging
logger = logging.getLogger(__name__)
class Tab:
def __init__(self, answer: Optional[str] = None, table: Optional[str] = None) -> None:
self.answer: Optional[str] = answer
self.table: Optional[str] = table
self.picked_keys: Dict["str", Any] = {}
self._build()
def _build(self):
pass
def key_picker(self, keys: Dict[str, Any]):
def keys_controls():
with ui.column() as col:
col.tailwind.width("[560px]").align_items("center")
with ui.card() as card:
card.tailwind.width("full")
key_select = ui.select(list(keys.keys()), label="key", new_value_mode="add-unique", with_input=True)
key_select.tailwind.width("full")
with ui.row() as row:
row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
self.current_help = None
self.current_key = el.FInput(label="key", on_change=lambda e: key_changed(e), read_only=True)
self.current_key.bind_value_from(key_select)
with ui.button(icon="help"):
self.current_help = ui.tooltip("NA")
ui.button(icon="add", on_click=lambda: add_key(self.current_key.value))
ui.separator()
self.keys_scroll = ui.scroll_area()
self.keys_scroll.tailwind.width("full").height("[480px]")
self.key_controls = {}
items = storage.answer(self.answer)
if self.table is not None and self.table in items:
for key, value in items[self.table].items():
if isinstance(value, list):
add_key(key, "[" + ",".join(str(v) for v in value) + "]")
else:
add_key(key, str(value))
def add_key(key, value=""):
if key is not None and key != "" and key not in self.picked_keys:
with self.keys_scroll:
with ui.row() as key_row:
key_row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
self.picked_keys[key] = value
self.key_controls[key] = {
"control": el.FInput(
key,
password=True if key == "root_password" else False,
autocomplete=keys[key]["options"] if "options" in keys[key] else None,
on_change=lambda e, key=key: set_key(key, e.value),
),
"row": key_row,
}
self.key_controls[key]["control"].value = value
if key in keys:
with ui.button(icon="help"):
ui.tooltip(keys[key]["description"])
ui.button(icon="remove", on_click=lambda _, key=key: remove_key(key))
def remove_key(key):
self.keys_scroll.remove(self.key_controls[key]["row"])
del self.picked_keys[key]
del self.key_controls[key]
def set_key(key, value: str):
if len(value) > 0:
if "type" in keys[key]:
if keys[key]["type"] == "list":
self.picked_keys[key] = value[1:-1].split(",")
elif keys[key]["type"] == "int":
self.picked_keys[key] = int(value)
else:
self.picked_keys[key] = value
else:
if len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]":
self.picked_keys[key] = value[1:-1].split(",")
elif value.isnumeric():
self.picked_keys[key] = int(value)
else:
self.picked_keys[key] = value
storage.answer(self.answer)[self.table] = self.picked_keys
def key_changed(e):
if self.current_help is not None:
if e.value in keys:
self.current_help.text = keys[e.value]["description"]
else:
self.current_help.text = "NA"
keys_controls()

180
autopve/tabs/history.py Normal file
View File

@ -0,0 +1,180 @@
import asyncio
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
import time
from nicegui import app, ui # type: ignore
from . import Tab
import autopve.elements as el
import logging
logger = logging.getLogger(__name__)
@dataclass(kw_only=True)
class Request:
answer: str
response: str
system_info: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
@property
def name(self) -> str:
if "dmi" in self.system_info:
if "system" in self.system_info["dmi"]:
if "name" in self.system_info["dmi"]["system"]:
return self.system_info["dmi"]["system"]["name"]
return ""
class SelectionConfirm:
def __init__(self, container, label) -> None:
self._container = container
self._label = label
self._visible = None
self._request = None
self._submitted = None
with self._container:
self._label = ui.label(self._label).tailwind().text_color("primary")
self._done = el.IButton(icon="done", on_click=lambda: self.submit("confirm"))
self._cancel = el.IButton(icon="close", on_click=lambda: self.submit("cancel"))
@property
def submitted(self) -> asyncio.Event:
if self._submitted is None:
self._submitted = asyncio.Event()
return self._submitted
def open(self) -> None:
self._container.visible = True
def close(self) -> None:
self._container.visible = False
self._container.clear()
def __await__(self):
self._request = None
self.submitted.clear()
self.open()
yield from self.submitted.wait().__await__() # pylint: disable=no-member
request = self._request
self.close()
return request
def submit(self, request) -> None:
self._request = request
self.submitted.set()
class History(Tab):
_history: List[Dict[str, Any]] = []
def _build(self):
async def display_request(e):
if e.args["data"]["system_info"] is not None and e.args["data"]["response"] is not None:
with ui.dialog() as dialog, el.Card():
with el.DBody(height="fit", width="fit"):
with el.WColumn():
with ui.tabs().classes("w-full") as tabs:
system_info_tab = ui.tab("System Info")
response_tab = ui.tab("Response")
with ui.tab_panels(tabs, value=system_info_tab):
with ui.tab_panel(system_info_tab):
system_info = e.args["data"]["system_info"]
properties = {"content": {"json": system_info}, "readOnly": True}
el.JsonEditor(properties=properties)
with ui.tab_panel(response_tab):
response = e.args["data"]["response"]
ui.code(response).tailwind.height("[640px]").width("[640px]")
with el.WRow() as row:
row.tailwind.height("[40px]")
el.DButton("Exit", on_click=lambda: dialog.submit("exit"))
await dialog
with el.WColumn() as col:
col.tailwind.height("full")
self._confirm = el.WRow()
self._confirm.visible = False
with el.WRow().classes("justify-between").bind_visibility_from(self._confirm, "visible", value=False):
with ui.row().classes("items-center"):
el.SmButton(text="Remove", on_click=self._remove_history)
with ui.row().classes("items-center"):
el.SmButton(text="Refresh", on_click=lambda _: self._grid.update())
self._grid = ui.aggrid(
{
"suppressRowClickSelection": True,
"rowSelection": "multiple",
"paginationAutoPageSize": True,
"pagination": True,
"defaultColDef": {
"resizable": True,
"sortable": True,
"suppressMovable": True,
"sortingOrder": ["asc", "desc"],
},
"columnDefs": [
{
"headerName": "Timestamp",
"field": "timestamp",
"filter": "agTextColumnFilter",
"maxWidth": 125,
":cellRenderer": """(data) => {
var date = new Date(data.value * 1000).toLocaleString(undefined, {dateStyle: 'short', timeStyle: 'short', hour12: false});;
return date;
}""",
"sort": "desc",
},
{
"headerName": "Name",
"field": "name",
"filter": "agTextColumnFilter",
"flex": 1,
},
{
"headerName": "Answer",
"field": "answer",
"filter": "agTextColumnFilter",
"maxWidth": 200,
},
],
"rowData": self._history,
},
theme="balham-dark",
)
self._grid.tailwind().width("full").height("5/6")
self._grid.on("cellClicked", lambda e: display_request(e))
def _set_selection(self, mode=None):
row_selection = "single"
self._grid.options["columnDefs"][0]["headerCheckboxSelection"] = False
self._grid.options["columnDefs"][0]["headerCheckboxSelectionFilteredOnly"] = True
self._grid.options["columnDefs"][0]["checkboxSelection"] = False
if mode is None:
pass
elif mode == "single":
self._grid.options["columnDefs"][0]["checkboxSelection"] = True
elif mode == "multiple":
row_selection = "multiple"
self._grid.options["columnDefs"][0]["headerCheckboxSelection"] = True
self._grid.options["columnDefs"][0]["checkboxSelection"] = True
self._grid.options["rowSelection"] = row_selection
self._grid.update()
def update_history(self):
self._grid.update()
@classmethod
def add_history(cls, request: Request) -> None:
if len(cls._history) > 1000:
cls._history.pop(0)
cls._history.append({"timestamp": request.timestamp, "name": request.name, "answer": request.answer, "response": request.response, "system_info": request.system_info})
async def _remove_history(self):
self._set_selection(mode="multiple")
request = await SelectionConfirm(container=self._confirm, label=">REMOVE<")
if request == "confirm":
rows = await self._grid.get_selected_rows()
for row in rows:
self._history.remove(row)
self._grid.update()
self._set_selection()

80
autopve/tabs/settings.py Normal file
View File

@ -0,0 +1,80 @@
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from nicegui import app, ui # type: ignore
from . import Tab
import autopve.elements as el
import logging
logger = logging.getLogger(__name__)
class Global(Tab):
def __init__(self, answer: Optional[str] = None) -> None:
self.keys = {
"keyboard": {"description": "The keyboard layout with the following possible options"},
"country": {"description": "The country code in the two letter variant. For example, at, us or fr."},
"fqdn": {"description": "The fully qualified domain name of the host. The domain part will be used as the search domain."},
"mailto": {"description": "The default email address for the user root."},
"timezone": {"description": "The timezone in tzdata format. For example, Europe/Vienna or America/New_York."},
"root_password": {"description": "The password for the root user.", "type": "str"},
"root_ssh_keys": {"description": "Optional. SSH public keys to add to the root users authorized_keys file after the installation."},
"reboot_on_error": {
"description": "If set to true, the installer will reboot automatically when an error is encountered. The default behavior is to wait to give the administrator a chance to investigate why the installation failed."
},
}
super().__init__(answer=answer, table="global")
def _build(self):
self.key_picker(keys=self.keys)
class Network(Tab):
def __init__(self, answer: Optional[str] = None) -> None:
self.keys = {
"source": {"description": "Where to source the static network configuration from. This can be from-dhcp or from-answer."},
"cidr": {"description": "The IP address in CIDR notation. For example, 192.168.1.10/24."},
"dns": {"description": "The IP address of the DNS server."},
"gateway": {"description": "The IP address of the default gateway."},
"filter": {"description": "Filter against the UDEV properties to select the network card. See filters."},
}
super().__init__(answer=answer, table="network")
def _build(self):
self.key_picker(keys=self.keys)
class Disk(Tab):
def __init__(self, answer: Optional[str] = None) -> None:
self.keys = {
"filesystem": {"description": "One of the following options: ext4, xfs, zfs, or btrfs.", "options": ["ext4", "xfs", "zfs", "btrfs"]},
"disk_list": {"description": 'List of disks to use. Useful if you are sure about the disk names. For example: disk_list = ["sda", "sdb"].'},
"filter": {"description": "Filter against UDEV properties to select the disks for the installation. See filters."},
"filter_match": {
"description": 'Can be "any" or "all". Decides if a match of any filter is enough of if all filters need to match for a disk to be selected. Default is "any".',
"options": ["any", "all"],
},
"zfs.raid": {
"description": "The RAID level that should be used. Options are raid0, raid1, raid10, raidz-1, raidz-2, or raidz-3.",
"options": ["raid0", "raid1", "raid10", "raidz-1", "raidz-2", "raidz-3"],
},
"zfs.ashift": {"description": ""},
"zfs.arc_max": {"description": ""},
"zfs.checksum": {"description": ""},
"zfs.compress": {"description": ""},
"zfs.copies": {"description": ""},
"zfs.hdsize": {"description": ""},
"lvm.hdsize": {"description": ""},
"lvm.swapsize": {"description": ""},
"lvm.maxroot": {"description": ""},
"lvm.maxvz": {"description": ""},
"lvm.minfree": {"description": ""},
"btrfs.raid": {
"description": "",
"options": ["raid0", "raid1", "raid10"],
},
"btrfs.hdsize": {"description": ""},
}
super().__init__(answer=answer, table="disk-setup")
def _build(self):
self.key_picker(keys=self.keys)

21
autopve/tabs/system.py Normal file
View File

@ -0,0 +1,21 @@
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from nicegui import app, ui # type: ignore
from . import Tab
import autopve.elements as el
from autopve import storage
import logging
logger = logging.getLogger(__name__)
class System(Tab):
def __init__(self, answer=None) -> None:
super().__init__(answer)
def _build(self):
def set_match(match: str):
storage.answer(self.answer)["match"] = match
answer = storage.answer(self.answer)
el.FInput("Match String", value=answer["match"] if "match" in answer else "", on_change=lambda e: set_match(e.value))