32 Commits
v0.0.3 ... main

Author SHA1 Message Date
140d1d803f added root-password-hashed 2025-06-26 19:08:10 -04:00
ef2ea6eedd remove cleartext root password from settings and history 2025-06-26 19:00:09 -04:00
b790be713a check for disk_list before removing 2024-10-12 20:33:00 -04:00
6809b34b9d Merge pull request #9 from Konrni/dev
use dropdown and add more descriptions
2024-10-07 21:22:13 -04:00
65bd553b09 enable arbitrary values for key selections 2024-10-07 10:51:13 -04:00
00fe5bf24c black 2024-10-07 10:30:04 -04:00
4b6cfeddad add descriptions from https://pve.proxmox.com/pve-docs/chapter-pve-installation.html#advanced_lvm_options
#advanced_lvm_options and https://pve.proxmox.com/wiki/Automated_Installation
2024-10-04 21:49:44 +02:00
6b0d81d4e9 use dropdown menu, when options are available. change FSelect to match FInput width 2024-10-02 22:18:13 +02:00
a3e07033e7 fixed stripping spaces for list values 2024-10-02 12:23:20 -04:00
5a3bb026d4 clean up 2024-10-01 21:16:16 -04:00
7aa752c3d3 improved list value handling 2024-10-01 21:14:53 -04:00
6b412140d4 Merge pull request #8 from Konrni/main
Add copy feature , fix a small bug
2024-09-30 21:37:35 -04:00
61157457b8 refactor answer selection actions 2024-09-29 17:36:37 -04:00
46cae312c8 Merge branch 'main' of https://github.com/Konrni/autopve 2024-09-29 19:16:12 +02:00
aa04718374 revert answer naming changes to allow submiting previous name 2024-09-28 16:46:32 -04:00
4c6495c4ee allow escape at answer naming 2024-09-28 16:45:48 -04:00
8f00ab9570 add password_toggle_button, enable password viewing 2024-09-28 10:34:30 +02:00
32dec36cd7 don't "unselect" after one answer deletion 2024-09-28 10:27:46 +02:00
f9b5c0d153 simplify answer storage logic and enable copy 2024-09-27 21:23:22 -04:00
b3dc60d950 improved answer sanity check 2024-09-27 21:21:47 -04:00
f8e93cedf2 syntax change 2024-09-27 21:20:30 -04:00
074975650f fix bug, where save after unchanged name would delete answer content 2024-09-28 00:01:29 +02:00
7f654071d0 added copy feature for answers 2024-09-27 23:51:13 +02:00
0711b55ad8 Update README.md 2024-05-12 15:01:46 -04:00
779d25def5 added disk-setup specfic sanity checks 2024-05-12 14:17:43 -04:00
bba6410bbd bind settings methods to class 2024-05-12 14:15:52 -04:00
edc5987293 Merge pull request #2 from natankeddem/fixremoves
Fixes for removal of keys and restrictions.
2024-05-06 18:53:31 -04:00
2b6f0e41e6 fixed removal of restrictions 2024-05-06 18:50:44 -04:00
444c2d3182 fixed removal of keys 2024-05-06 18:50:13 -04:00
a069cfe410 Update README.md 2024-05-05 16:19:45 -04:00
005af9e3da project refactor 2024-05-05 16:03:46 -04:00
c927ef4c2f enable prettier 2024-05-03 19:30:35 -04:00
22 changed files with 677 additions and 363 deletions

27
.devcontainer/Dockerfile Normal file
View File

@ -0,0 +1,27 @@
FROM python:3.12.2-bookworm
ENV DEBIAN_FRONTEND=noninteractive \
DISPLAY=:99 \
NICEGUI_STORAGE_PATH=data
# Install packages
RUN apt-get update && apt-get install --no-install-recommends -y \
sudo git build-essential chromium chromium-driver python3-pip\
&& rm -rf /var/lib/apt/lists/*
# Create remote user
ARG USERNAME=vscode
ARG USER_UID=1000
ARG USER_GID=$USER_UID
RUN groupadd --gid $USER_GID $USERNAME \
&& useradd --uid $USER_UID --gid $USER_GID -m $USERNAME \
&& echo $USERNAME ALL=\(root\) NOPASSWD:ALL > /etc/sudoers.d/$USERNAME \
&& chmod 0440 /etc/sudoers.d/$USERNAME
ENV PATH="/home/${USERNAME}/.local/bin:${PATH}"
ENV CHROME_BINARY_LOCATION=/usr/bin/chromium
USER $USERNAME
CMD ["python", "wait.py"]

View File

@ -0,0 +1,38 @@
// For format details, see https://aka.ms/devcontainer.json.
{
"name": "autopve-dev",
"build": {
"context": "..",
"dockerfile": "Dockerfile"
},
"customizations": {
"vscode": {
"extensions": [
"cschleiden.vscode-github-actions",
"esbenp.prettier-vscode",
"littlefoxteam.vscode-python-test-adapter",
"ms-python.python",
"samuelcolvin.jinjahtml",
"Vue.volar"
],
"settings": {
"python.testing.cwd": "/workspaces/autopve/",
"python.testing.pytestArgs": ["tests"],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.defaultInterpreterPath": "/usr/local/bin/python3",
"terminal.integrated.defaultProfile.linux": "bash",
"terminal.integrated.shell.linux": "bash",
"terminal.integrated.profiles.linux": {
"bash (container default)": {
"path": "/usr/bin/bash",
"overrideName": true
}
}
}
}
},
// More info: https://aka.ms/dev-containers-non-root.
"remoteUser": "vscode",
"postCreateCommand": "/usr/local/bin/python3 -m pip install -r requirements-test.txt"
}

View File

@ -7,5 +7,6 @@ __pycache__/
logs/
notes/
mpl/
screenshots/
mysecret.py
test.py

3
.gitignore vendored
View File

@ -168,4 +168,5 @@ notes/
mpl/
mysecret.py
test.py
mount/
mount/
screenshots/

View File

@ -2,7 +2,7 @@
## Demo
[autopve_demo.webm](https://github.com/natankeddem/autopve/assets/44515217/827bdd22-5311-43c1-9452-a56fa11998aa)
https://github.com/natankeddem/autopve/assets/44515217/9439e2e2-a7bf-4677-aea8-0684318bea6c
## Information

View File

View File

@ -43,6 +43,9 @@
"terminal.integrated.enablePersistentSessions": true,
"[json]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
},
"[jsonc]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
}
}
}

View File

@ -1,11 +1,10 @@
import asyncio
from nicegui import ui # type: ignore
from autopve import elements as el
import autopve.logo as logo
from autopve.tabs import Tab
from autopve import logo as logo
from autopve.tabs.settings import Global, Network, Disk
from autopve.tabs.history import History
from autopve.tabs.system import System
from autopve.tabs.system import MustContain, MustNotContain
import logging
logger = logging.getLogger(__name__)
@ -42,7 +41,8 @@ class Content:
self._tab["disk"] = ui.tab(name="Disk").classes("text-secondary")
self._tab["history"] = ui.tab(name="History").classes("text-secondary")
if self._answer != "Default":
self._tab["system"] = ui.tab(name="System").classes("text-secondary")
self._tab["must_contain"] = ui.tab(name="Must Contain").classes("text-secondary")
self._tab["must_not_contain"] = ui.tab(name="Must Not Contain").classes("text-secondary")
with ui.row().classes("items-center"):
self._answer_display = ui.label(self._answer).classes("text-secondary text-h4")
logo.show()
@ -52,7 +52,11 @@ class Content:
async def _tab_changed(self, e):
if e.value == "History":
self._history.update_history()
self._history.update()
elif e.value == "Must Contain":
self._must_contain.update()
elif e.value == "Must Not Contain":
self._must_not_contain.update()
def _build_tab_panels(self):
self._tab_panels.clear()
@ -62,7 +66,8 @@ class Content:
self._disk_content = el.ContentTabPanel(self._tab["disk"])
self._history_content = el.ContentTabPanel(self._tab["history"])
if self._answer != "Default":
self._system_content = el.ContentTabPanel(self._tab["system"])
self._must_contain_content = el.ContentTabPanel(self._tab["must_contain"])
self._must_not_contain_content = el.ContentTabPanel(self._tab["must_not_contain"])
with self._global_content:
self._global = Global(answer=self._answer)
with self._network_content:
@ -72,8 +77,10 @@ class Content:
with self._history_content:
self._history = History(answer=self._answer)
if self._answer != "Default":
with self._system_content:
self._system = System(answer=self._answer)
with self._must_contain_content:
self._must_contain = MustContain(answer=self._answer)
with self._must_not_contain_content:
self._must_not_contain = MustNotContain(answer=self._answer)
async def answer_selected(self, name):
self._answer = name

View File

@ -1,9 +1,8 @@
from typing import Optional
from nicegui.events import KeyEventArguments
from nicegui import ui # type: ignore
from autopve import elements as el
from autopve import storage
from autopve.tabs import Tab
import logging
logger = logging.getLogger(__name__)
@ -45,6 +44,7 @@ class Drawer(object):
el.IButton(icon="add", on_click=self._display_answer_dialog)
self._buttons["remove"] = el.IButton(icon="remove", on_click=lambda: self._modify_answer("remove"))
self._buttons["edit"] = el.IButton(icon="edit", on_click=lambda: self._modify_answer("edit"))
self._buttons["content_copy"] = el.IButton(icon="content_copy", on_click=lambda: self._modify_answer("content_copy"))
ui.label(text="ANSWERS").classes("text-secondary")
self._table = (
ui.table(
@ -84,11 +84,11 @@ class Drawer(object):
self._table.add_rows({"name": name})
self._table.visible = True
async def _display_answer_dialog(self, name=""):
async def _display_answer_dialog(self, name="", copy=False):
save = None
with ui.dialog() as answer_dialog, el.Card():
with el.DBody(height="[95vh]", width="[360px]"):
with el.DBody(height="fit", width="[320px]"):
with el.WColumn():
all_answers = list(storage.answers.keys())
for answer in list(storage.answers.keys()):
@ -101,26 +101,38 @@ class Drawer(object):
def answer_check(value: str) -> Optional[bool]:
spaceless = value.replace(" ", "")
if len(spaceless) == 0:
return False
for invalid_value in all_answers:
if invalid_value == spaceless:
return False
return None
def enter_submit(e: KeyEventArguments) -> None:
if e.key == "Enter" and save_ea.no_errors is True:
answer_dialog.submit("save")
elif e.key == "Escape":
answer_dialog.close()
answer_input = el.VInput(label="answer", value=" ", invalid_characters="""'`"$\\;&<>|(){}""", invalid_values=all_answers, check=answer_check, max_length=20)
save_ea = el.ErrorAggregator(answer_input)
el.DButton("SAVE", on_click=lambda: answer_dialog.submit("save")).bind_enabled_from(save_ea, "no_errors")
answer_input.value = name
ui.keyboard(on_key=enter_submit, ignore=[])
answer_input.value = name
result = await answer_dialog
if result == "save":
answer = answer_input.value.strip()
if len(answer) > 0 and name != "Default":
if name in storage.answers:
answer = answer_input.value.strip()
if result == "save" and name != answer:
if name in storage.answers:
storage.answers[answer] = storage.answer(name, copy=True)
if copy is False:
del storage.answers[name]
for row in self._table.rows:
if name == row["name"]:
self._table.remove_rows(row)
self._add_answer_to_table(answer)
for row in self._table.rows:
if name == row["name"]:
self._table.remove_rows(row)
else:
storage.answer(answer)
self._add_answer_to_table(answer)
def _modify_answer(self, mode):
self._hide_content()
@ -146,17 +158,20 @@ class Drawer(object):
async def _selected(self, e):
self._hide_content()
if self._selection_mode == "edit":
if len(e.selection) > 0 and e.selection[0]["name"] != "Default":
await self._display_answer_dialog(name=e.selection[0]["name"])
if self._selection_mode == "remove":
if len(e.selection) > 0:
for row in e.selection:
if row["name"] != "Default":
if row["name"] in storage.answers:
del storage.answers[row["name"]]
self._table.remove_rows(row)
self._modify_answer(None)
if len(e.selection) == 1:
answer = e.selection[0]["name"]
if self._selection_mode == "content_copy":
await self._display_answer_dialog(name=answer, copy=True)
self._modify_answer(None)
elif answer == "Default":
self._table._props["selected"] = []
elif self._selection_mode == "edit":
await self._display_answer_dialog(name=answer)
self._modify_answer(None)
elif self._selection_mode == "remove":
if answer in storage.answers:
del storage.answers[answer]
self._table.remove_rows(e.selection[0])
async def _clicked(self, e):
if "name" in e.args[1]:

View File

@ -364,7 +364,7 @@ class FSelect(ui.select):
clearable: bool = False,
) -> None:
super().__init__(options, label=label, value=value, on_change=on_change, with_input=with_input, new_value_mode=new_value_mode, multiple=multiple, clearable=clearable)
self.tailwind.width("64")
self.tailwind.width("[320px]")
class JsonEditor(ui.json_editor):

View File

@ -1,74 +0,0 @@
from typing import Any, Dict, List, Optional, Union
import asyncio
import copy
import json
import tomlkit
from fastapi import Request
from fastapi.responses import PlainTextResponse
from nicegui import app, Client, ui # type: ignore
from autopve import elements as el
from autopve.drawer import Drawer
from autopve.content import Content
from autopve import storage
import autopve.tabs.history as history
import logging
logger = logging.getLogger(__name__)
def build():
@app.post("/answer")
async def post_answer(request: Request) -> PlainTextResponse:
def response(answer: str, system_info: Dict[str, Any], data: Dict[str, Any]):
toml = tomlkit.dumps(data)
toml_fixed = ""
for line in toml.splitlines():
if len(line) > 0 and line[0] == '"':
line = line.replace('"', "", 2)
toml_fixed = toml_fixed + line + "\n"
r = history.Request(answer=answer, response=toml_fixed, system_info=copy.deepcopy(system_info))
history.History.add_history(r)
for client in Client.instances.values():
if not client.has_socket_connection:
continue
with client:
el.Notification(f"New answer request from {r.name} served by {r.answer}!", type="positive", timeout=30)
return PlainTextResponse(toml_fixed)
system_info = await request.json()
system_info_raw = json.dumps(system_info)
default_data = copy.deepcopy(storage.answer("Default"))
answers = list(storage.answers.keys())
if "Default" in answers:
answers.remove("Default")
for answer in answers:
answer_data = copy.deepcopy(storage.answer(answer))
if "match" in answer_data:
if len(answer_data["match"]) > 0 and answer_data["match"] in system_info_raw:
if "global" in default_data and "global" in answer_data:
default_data["global"].update(answer_data["global"])
if "network" in default_data and "network" in answer_data:
default_data["network"].update(answer_data["network"])
if "disk-setup" in default_data and "disk-setup" in answer_data:
default_data["disk-setup"].update(answer_data["disk-setup"])
return response(answer, system_info, default_data)
return response("Default", system_info, default_data)
@ui.page("/", response_timeout=30)
async def index(client: Client) -> None:
app.add_static_files("/static", "static")
el.load_element_css()
ui.colors(
primary=el.orange,
secondary=el.orange,
accent=el.orange,
dark=el.dark,
positive="#21BA45",
negative="#C10015",
info="#5C8984",
warning="#F2C037",
)
column = ui.column()
content = Content()
drawer = Drawer(column, content.answer_selected, content.hide)
drawer.build()

View File

@ -1,11 +1,10 @@
from typing import Any, Dict, Literal
from typing import Any, Dict
import json
from nicegui import app
import logging
logger = logging.getLogger(__name__)
configs_version = int(102)
configs_version = int(100)
configs_version_string = f"config_{configs_version}"
root = app.storage.general.get(configs_version_string, None)
if root is None:
@ -35,73 +34,10 @@ if "Default" not in answers:
}
def answer(name: str) -> dict:
def answer(name: str, copy: bool = False) -> dict:
if name not in answers:
answers[name] = {}
return answers[name]
# def algo(answer_name: str) -> dict:
# h = answer(answer_name)
# if "algo" not in h:
# h["algo"] = {}
# return h["algo"]
# def algo_sensor(answer_name: str, sensor: str) -> dict:
# a = algo(answer_name)
# if sensor not in a:
# a[sensor] = {}
# if "type" not in a[sensor]:
# a[sensor]["type"] = "curve"
# return a[sensor]
# def curve(answer_name: str, sensor: str) -> dict:
# s = algo_sensor(answer_name, sensor)
# if "curve" not in s:
# s["curve"] = {}
# return s["curve"]
# def curve_speed(answer_name: str, sensor: str, default=None) -> dict:
# c = curve(answer_name, sensor)
# if "speed" not in c:
# if default is None:
# c["speed"] = {
# "Min": None,
# "Low": None,
# "Medium": None,
# "High": None,
# "Max": None,
# }
# else:
# c["speed"] = default
# return c["speed"]
# def curve_temp(answer_name: str, sensor: str, default=None) -> dict:
# c = curve(answer_name, sensor)
# if "temp" not in c:
# if default is None:
# c["temp"] = {
# "Min": 30,
# "Low": 40,
# "Medium": 50,
# "High": 60,
# "Max": 70,
# }
# else:
# c["temp"] = default
# return c["temp"]
# def pid(answer_name: str, sensor: str) -> Dict[str, float]:
# s = algo_sensor(answer_name, sensor)
# if "pid" not in s:
# s["pid"] = {"Kp": 5, "Ki": 0.01, "Kd": 0.1, "Target": 40}
# return s["pid"]
# def pid_coefficient(answer_name: str, sensor: str, coefficient: Literal["Kp", "Ki", "Kd", "Target"]) -> float:
# return pid(answer_name, sensor)[coefficient]
if copy is False:
return answers[name]
else:
return json.loads(json.dumps(answers[name]))

View File

@ -1,104 +1,25 @@
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from nicegui import app, ui # type: ignore
import autopve.elements as el
from autopve import storage
import logging
logger = logging.getLogger(__name__)
@dataclass(kw_only=True)
class Share:
history: List[Dict[str, Any]] = field(default_factory=list)
last_timestamp: float = 0
unique_system_information: List[str] = field(default_factory=list)
class Tab:
def __init__(self, answer: Optional[str] = None, table: Optional[str] = None) -> None:
self.answer: Optional[str] = answer
self.table: Optional[str] = table
self.picked_keys: Dict["str", Any] = {}
_share: Share = Share()
def __init__(self, answer: str, type: Optional[str] = None) -> None:
self.answer: str = answer
self.type: Optional[str] = type
self._elements: Dict[str, Any] = {}
self._build()
def _build(self):
pass
def key_picker(self, keys: Dict[str, Any]):
def keys_controls():
with ui.column() as col:
col.tailwind.width("[560px]").align_items("center")
with ui.card() as card:
card.tailwind.width("full")
key_select = ui.select(list(keys.keys()), label="key", new_value_mode="add-unique", with_input=True)
key_select.tailwind.width("full")
with ui.row() as row:
row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
self.current_help = None
self.current_key = el.FInput(label="key", on_change=lambda e: key_changed(e), read_only=True)
self.current_key.bind_value_from(key_select)
with ui.button(icon="help"):
self.current_help = ui.tooltip("NA")
ui.button(icon="add", on_click=lambda: add_key(self.current_key.value))
ui.separator()
self.keys_scroll = ui.scroll_area()
self.keys_scroll.tailwind.width("full").height("[480px]")
self.key_controls = {}
items = storage.answer(self.answer)
if self.table is not None and self.table in items:
for key, value in items[self.table].items():
if isinstance(value, list):
add_key(key, "[" + ",".join(str(v) for v in value) + "]")
else:
add_key(key, str(value))
def add_key(key, value=""):
if key is not None and key != "" and key not in self.picked_keys:
with self.keys_scroll:
with ui.row() as key_row:
key_row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
self.picked_keys[key] = value
self.key_controls[key] = {
"control": el.FInput(
key,
password=True if key == "root_password" else False,
autocomplete=keys[key]["options"] if key in keys and "options" in keys[key] else None,
on_change=lambda e, key=key: set_key(key, e.value),
),
"row": key_row,
}
self.key_controls[key]["control"].value = value
if key in keys:
with ui.button(icon="help"):
ui.tooltip(keys[key]["description"])
ui.button(icon="remove", on_click=lambda _, key=key: remove_key(key))
def remove_key(key):
self.keys_scroll.remove(self.key_controls[key]["row"])
del self.picked_keys[key]
del self.key_controls[key]
def set_key(key, value: str):
if len(value) > 0:
if key in keys and "type" in keys[key]:
if keys[key]["type"] == "list":
self.picked_keys[key] = value[1:-1].split(",")
elif keys[key]["type"] == "int":
self.picked_keys[key] = int(value)
else:
self.picked_keys[key] = value
else:
if len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]":
self.picked_keys[key] = value[1:-1].split(",")
elif value.isnumeric():
self.picked_keys[key] = int(value)
else:
self.picked_keys[key] = value
storage.answer(self.answer)[self.table] = self.picked_keys
def key_changed(e):
if self.current_help is not None:
if e.value in keys:
self.current_help.text = keys[e.value]["description"]
else:
self.current_help.text = "NA"
keys_controls()

View File

@ -2,9 +2,11 @@ import asyncio
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
import time
import json
import re
from nicegui import app, ui # type: ignore
from . import Tab
import autopve.elements as el
from autopve import elements as el
import logging
logger = logging.getLogger(__name__)
@ -32,7 +34,7 @@ class SelectionConfirm:
self._label = label
self._visible = None
self._request = None
self._submitted = None
self._submitted: Optional[asyncio.Event] = None
with self._container:
self._label = ui.label(self._label).tailwind().text_color("primary")
self._done = el.IButton(icon="done", on_click=lambda: self.submit("confirm"))
@ -66,8 +68,6 @@ class SelectionConfirm:
class History(Tab):
_history: List[Dict[str, Any]] = []
def _build(self):
async def display_request(e):
if e.args["data"]["system_info"] is not None and e.args["data"]["response"] is not None:
@ -84,6 +84,15 @@ class History(Tab):
el.JsonEditor(properties=properties)
with ui.tab_panel(response_tab):
response = e.args["data"]["response"]
lines = response.splitlines()
response_lines = []
for line in lines:
if line.strip().startswith("root_password"):
response_lines.append('root_password = "SECRET"')
else:
response_lines.append(line)
response = "\n".join(response_lines)
ui.code(response).tailwind.height("[320px]").width("[640px]")
with el.WRow() as row:
@ -137,7 +146,7 @@ class History(Tab):
"maxWidth": 200,
},
],
"rowData": self._history,
"rowData": self._share.history,
},
theme="balham-dark",
)
@ -160,14 +169,27 @@ class History(Tab):
self._grid.options["rowSelection"] = row_selection
self._grid.update()
def update_history(self):
def update(self):
self._grid.update()
@classmethod
def add_history(cls, request: Request) -> None:
if len(cls._history) > 1000:
cls._history.pop(0)
cls._history.append({"timestamp": request.timestamp, "name": request.name, "answer": request.answer, "response": request.response, "system_info": request.system_info})
if len(cls._share.history) > 1000:
cls._share.history.pop(0)
cls._share.history.append(
{
"timestamp": request.timestamp,
"name": request.name,
"answer": request.answer,
"response": request.response,
"system_info": request.system_info,
}
)
cls._share.last_timestamp = request.timestamp
matches = re.findall(r"(\"[^\"]+\"\s*:\s*(\"[^\"]+\"|\d+|true|false))", json.dumps(request.system_info))
for match in matches:
if str(match[0]) not in cls._share.unique_system_information:
cls._share.unique_system_information.append(str(match[0]))
async def _remove_history(self):
self._set_selection(mode="multiple")
@ -175,6 +197,6 @@ class History(Tab):
if request == "confirm":
rows = await self._grid.get_selected_rows()
for row in rows:
self._history.remove(row)
self._share.history.remove(row)
self._grid.update()
self._set_selection()

View File

@ -1,51 +1,191 @@
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from nicegui import app, ui # type: ignore
from typing import Any, Dict, Optional
from nicegui import ui
from . import Tab
import autopve.elements as el
from autopve import elements as el
from autopve import storage
import logging
logger = logging.getLogger(__name__)
class Global(Tab):
def __init__(self, answer: Optional[str] = None) -> None:
self.keys = {
"keyboard": {"description": "The keyboard layout with the following possible options"},
class Setting(Tab):
def __init__(self, answer: str, type: Optional[str] = None, keys: Dict[str, Dict[str, Any]] = {}) -> None:
self.keys: Dict[str, Dict[str, Any]] = keys
super().__init__(answer, type=type)
def _build(self):
self.keys_controls()
def keys_controls(self):
with ui.column() as col:
col.tailwind.width("[560px]").align_items("center")
with ui.card() as card:
card.tailwind.width("full")
key_select = ui.select(list(self.keys.keys()), label="key", new_value_mode="add", with_input=True)
key_select.tailwind.width("full")
with ui.row() as row:
row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
self.help = None
key = el.FInput(label="key", on_change=lambda e: self.key_changed(e.value), read_only=True)
key.bind_value_from(key_select)
with ui.button(icon="help"):
self.help = ui.tooltip("NA")
ui.button(icon="add", on_click=lambda key=key: self.add_key(key.value))
ui.separator()
self._scroll = ui.scroll_area()
self._scroll.tailwind.width("full").height("[480px]")
items = storage.answer(self.answer)
if self.type is not None and self.type in items:
for key, value in items[self.type].items():
if isinstance(value, list):
self.add_key(key, "[" + ",".join(str(v) for v in value) + "]")
else:
self.add_key(key, str(value))
def add_key(self, key: str, value: str = ""):
if self.key_valid(key) is True:
with self._scroll:
with ui.row() as key_row:
key_row.tailwind.width("full").align_items("center").justify_content("between")
with ui.row() as row:
row.tailwind.align_items("center")
if key in self.keys and "options" in self.keys[key]:
options = self.keys[key]["options"]
if value != "" and value not in self.keys[key]["options"]:
options.append(value)
control = el.FSelect(
label=key,
options=options,
with_input=True,
new_value_mode="add-unique",
on_change=lambda e, key=key: self.set_key(key, e.value) if e.value is not None else None,
)
else:
control = el.FInput(
label=key,
password=True if key == "root_password" else False,
password_toggle_button=False,
on_change=lambda e, key=key: self.set_key(key, e.value),
)
self._elements[key] = {
"control": control,
"row": key_row,
}
if isinstance(control, el.FSelect):
control.value = self.keys[key]["options"][0] if value == "" else value
else:
control.value = value
if key in self.keys:
with ui.button(icon="help"):
ui.tooltip(self.keys[key]["description"])
ui.button(icon="remove", on_click=lambda _, key=key: self.remove_key(key))
def key_valid(self, key: str) -> bool:
if key is not None and key != "" and key not in self._elements.keys():
return True
return False
def remove_key(self, key: str):
self._scroll.remove(self._elements[key]["row"])
del self._elements[key]
if key in storage.answer(self.answer)[self.type]:
del storage.answer(self.answer)[self.type][key]
def set_key(self, key: str, value: str):
v: Any = ""
if len(value) > 0:
if key in self.keys and "type" in self.keys[key]:
if self.keys[key]["type"] == "list" and len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]":
l = value.strip()[1:-1].replace('"', "").replace("'", "").split(",")
v = [v.strip() for v in l]
elif self.keys[key]["type"] == "int":
v = int(value)
else:
v = value
else:
if len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]":
l = value.strip()[1:-1].replace('"', "").replace("'", "").split(",")
v = [v.strip() for v in l]
elif value.isnumeric():
v = int(value)
else:
v = value
if self.type not in storage.answer(self.answer):
storage.answer(self.answer)[self.type] = {}
storage.answer(self.answer)[self.type][key] = v
def key_changed(self, value: str):
if self.help is not None:
if value in self.keys:
self.help.text = self.keys[value]["description"]
else:
self.help.text = "NA"
class Global(Setting):
def __init__(self, answer: str) -> None:
keys = {
"keyboard": {
"description": "The keyboard layout with the following possible options",
"options": [
"de",
"de-ch",
"dk",
"en-gb",
"en-us",
"es",
"fi",
"fr",
"fr-be",
"fr-ca",
"fr-ch",
"hu",
"is",
"it",
"jp",
"lt",
"mk",
"nl",
"no",
"pl",
"pt",
"pt-br",
"se",
"si",
"tr",
],
},
"country": {"description": "The country code in the two letter variant. For example, at, us or fr."},
"fqdn": {"description": "The fully qualified domain name of the host. The domain part will be used as the search domain."},
"mailto": {"description": "The default email address for the user root."},
"timezone": {"description": "The timezone in tzdata format. For example, Europe/Vienna or America/New_York."},
"root_password": {"description": "The password for the root user.", "type": "str"},
"root-password-hashed": {"description": "The pre-hashed password for the root user, which will be written verbatim to /etc/passwd. May be used instead of root_password and can be generated using the mkpasswd tool, for example.", "type": "str"},
"root_ssh_keys": {"description": "Optional. SSH public keys to add to the root users authorized_keys file after the installation."},
"reboot_on_error": {
"description": "If set to true, the installer will reboot automatically when an error is encountered. The default behavior is to wait to give the administrator a chance to investigate why the installation failed."
},
}
super().__init__(answer=answer, table="global")
def _build(self):
self.key_picker(keys=self.keys)
super().__init__(answer, type="global", keys=keys)
class Network(Tab):
def __init__(self, answer: Optional[str] = None) -> None:
self.keys = {
"source": {"description": "Where to source the static network configuration from. This can be from-dhcp or from-answer."},
class Network(Setting):
def __init__(self, answer: str) -> None:
keys = {
"source": {"description": "Where to source the static network configuration from. This can be from-dhcp or from-answer.", "options": ["from-dhcp", "from-answer"]},
"cidr": {"description": "The IP address in CIDR notation. For example, 192.168.1.10/24."},
"dns": {"description": "The IP address of the DNS server."},
"gateway": {"description": "The IP address of the default gateway."},
"filter": {"description": "Filter against the UDEV properties to select the network card. See filters."},
}
super().__init__(answer=answer, table="network")
def _build(self):
self.key_picker(keys=self.keys)
super().__init__(answer, type="network", keys=keys)
class Disk(Tab):
def __init__(self, answer: Optional[str] = None) -> None:
self.keys = {
class Disk(Setting):
def __init__(self, answer: str) -> None:
keys = {
"filesystem": {"description": "One of the following options: ext4, xfs, zfs, or btrfs.", "options": ["ext4", "xfs", "zfs", "btrfs"]},
"disk_list": {"description": 'List of disks to use. Useful if you are sure about the disk names. For example: disk_list = ["sda", "sdb"].'},
"filter": {"description": "Filter against UDEV properties to select the disks for the installation. See filters."},
@ -57,24 +197,50 @@ class Disk(Tab):
"description": "The RAID level that should be used. Options are raid0, raid1, raid10, raidz-1, raidz-2, or raidz-3.",
"options": ["raid0", "raid1", "raid10", "raidz-1", "raidz-2", "raidz-3"],
},
"zfs.ashift": {"description": ""},
"zfs.arc_max": {"description": ""},
"zfs.checksum": {"description": ""},
"zfs.compress": {"description": ""},
"zfs.copies": {"description": ""},
"zfs.hdsize": {"description": ""},
"lvm.hdsize": {"description": ""},
"lvm.swapsize": {"description": ""},
"lvm.maxroot": {"description": ""},
"lvm.maxvz": {"description": ""},
"lvm.minfree": {"description": ""},
"zfs.ashift": {
"description": "Defines the ashift value for the created pool. The ashift needs to be set at least to the sector-size of the underlying disks (2 to the power of ashift is the sector-size), or any disk which might be put in the pool (for example the replacement of a defective disk)."
},
"zfs.arc_max": {
"description": "Defines the maximum size the ARC can grow to and thus limits the amount of memory ZFS will use. See also the section on how to limit ZFS memory usage for more details."
},
"zfs.checksum": {"description": "Defines which checksumming algorithm should be used for rpool."},
"zfs.compress": {"description": "Defines whether compression is enabled for rpool."},
"zfs.copies": {
"description": "Defines the copies parameter for rpool. Check the zfs(8) manpage for the semantics, and why this does not replace redundancy on disk-level."
},
"zfs.hdsize": {
"description": "Defines the total hard disk size to be used. This is useful to save free space on the hard disk(s) for further partitioning (for example to create a swap-partition). hdsize is only honored for bootable disks, that is only the first disk or mirror for RAID0, RAID1 or RAID10, and all disks in RAID-Z[123]."
},
"lvm.hdsize": {
"description": "Defines the total hard disk size to be used. This way you can reserve free space on the hard disk for further partitioning (for example for an additional PV and VG on the same hard disk that can be used for LVM storage)."
},
"lvm.swapsize": {
"description": "Defines the size of the swap volume. The default is the size of the installed memory, minimum 4 GB and maximum 8 GB. The resulting value cannot be greater than hdsize/8."
},
"lvm.maxroot": {
"description": "Defines the maximum size of the root volume, which stores the operation system. The maximum limit of the root volume size is hdsize/4."
},
"lvm.maxvz": {
"description": "Defines the maximum size of the data volume. The actual size of the data volume is: datasize = hdsize - rootsize - swapsize - minfree Where datasize cannot be bigger than maxvz."
},
"lvm.minfree": {
"description": "Defines the amount of free space that should be left in the LVM volume group pve. With more than 128GB storage available, the default is 16GB, otherwise hdsize/8 will be used."
},
"btrfs.raid": {
"description": "",
"description": "The RAID level that should be used. Options are raid0, raid1, and raid10",
"options": ["raid0", "raid1", "raid10"],
},
"btrfs.hdsize": {"description": ""},
}
super().__init__(answer=answer, table="disk-setup")
super().__init__(answer, type="disk-setup", keys=keys)
def _build(self):
self.key_picker(keys=self.keys)
def key_valid(self, key: str) -> bool:
if super().key_valid(key) is True:
if "filter" in key and "disk_list" in self._elements.keys():
el.Notification(f"Can not add {key} when disk_list is utilized!", type="negative", timeout=5)
return False
elif key == "disk_list" and any("filter" in k for k in self._elements.keys()):
el.Notification("Can not add disk_list when a filter is utilized!", type="negative", timeout=5)
return False
return True
return False

View File

@ -1,8 +1,7 @@
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass, field
from nicegui import app, ui # type: ignore
from typing import Optional
from nicegui import ui
from . import Tab
import autopve.elements as el
from autopve import elements as el
from autopve import storage
import logging
@ -10,12 +9,75 @@ logger = logging.getLogger(__name__)
class System(Tab):
def __init__(self, answer=None) -> None:
super().__init__(answer)
def __init__(self, answer: str, type: Optional[str] = None, note: str = "") -> None:
self.note: str = note
self.select: Optional[ui.select] = None
self.last_update_timestamp: float = 0
super().__init__(answer, type=type)
def _build(self):
def set_match(match: str):
storage.answer(self.answer)["match"] = match
self.restriction_picker()
answer = storage.answer(self.answer)
el.FInput("Match String", value=answer["match"] if "match" in answer else "", on_change=lambda e: set_match(e.value))
def restriction_picker(self):
def restriction_controls():
with ui.column() as col:
col.tailwind.width("[560px]").align_items("center")
with ui.card() as card:
card.tailwind.width("full")
self.select = ui.select(self._share.unique_system_information, new_value_mode="add", with_input=True)
self.select.tailwind.width("full")
card.on("mousemove", handler=self.update, throttle=3)
with ui.row() as row:
row.tailwind.width("full").align_items("center").justify_content("between")
restriction = el.FInput(read_only=True)
restriction.tailwind.width("[420px]")
restriction.bind_value_from(self.select)
ui.button(icon="add", on_click=lambda restriction=restriction: add_restriction(restriction.value))
ui.label(self.note).tailwind.align_self("center")
ui.separator()
self.scroll = ui.scroll_area()
self.scroll.tailwind.width("full").height("[480px]")
restrictions = []
if self.type in storage.answer(self.answer):
restrictions = storage.answer(self.answer)[self.type]
for restriction in restrictions:
add_restriction(restriction)
def add_restriction(restriction: str):
if restriction is not None and restriction.strip() != "" and restriction not in self._elements.keys():
with self.scroll:
with ui.row() as row:
row.tailwind.width("full").align_items("center").justify_content("between")
self._elements[restriction] = {
"control": el.FInput(value=restriction, read_only=True),
"row": row,
}
self._elements[restriction]["control"].tailwind.width("[420px]")
ui.button(icon="remove", on_click=lambda _, r=restriction: remove_restriction(r))
if self.type not in storage.answer(self.answer):
storage.answer(self.answer)[self.type] = []
if restriction not in storage.answer(self.answer)[self.type]:
storage.answer(self.answer)[self.type].append(restriction)
def remove_restriction(restriction):
self.scroll.remove(self._elements[restriction]["row"])
del self._elements[restriction]
if restriction in storage.answer(self.answer)[self.type]:
storage.answer(self.answer)[self.type].remove(restriction)
restriction_controls()
def update(self):
if self.select is not None and self._share.last_timestamp > self.last_update_timestamp:
self.last_update_timestamp = self._share.last_timestamp
self.select.update()
class MustContain(System):
def __init__(self, answer: str) -> None:
super().__init__(answer, type="must_contain", note="The system information must contain at least one of these strings.")
class MustNotContain(System):
def __init__(self, answer: str) -> None:
super().__init__(answer, type="must_not_contain", note="The system information must not contain any of these strings.")

95
main.py
View File

@ -2,8 +2,12 @@ import mylogging
import logging
logger = logging.getLogger(__name__)
from typing import Any, Dict
import json
import tomlkit
import os
os.environ.setdefault("NICEGUI_STORAGE_PATH", "data")
if not os.path.exists("data"):
logger.warning("Could not find 'data' directory, verify bind mounts.")
if os.path.exists(".nicegui"):
@ -14,12 +18,14 @@ if not os.path.exists("data"):
os.makedirs("data")
else:
logger.warning("Found 'data' directory.")
os.environ.setdefault("NICEGUI_STORAGE_PATH", "data")
from fastapi import Request
from fastapi.responses import PlainTextResponse
from nicegui import app, Client, ui # type: ignore
if __name__ in {"__main__", "__mp_main__"}:
from nicegui import app, ui # type: ignore
@ui.page("/", response_timeout=30)
# async def page(client: Client) -> None:
def page() -> None:
ui.card.default_style("max-width: none")
ui.card.default_props("flat bordered")
ui.input.default_props("outlined dense hide-bottom-space")
@ -29,9 +35,86 @@ if __name__ in {"__main__", "__mp_main__"}:
ui.stepper.default_props("flat")
ui.stepper.default_classes("full-size-stepper")
from autopve import page
import autopve.elements as el
from autopve.drawer import Drawer
from autopve.content import Content
app.add_static_files("/static", "static")
el.load_element_css()
ui.colors(
primary=el.orange,
secondary=el.orange,
accent=el.orange,
dark=el.dark,
positive="#21BA45",
negative="#C10015",
info="#5C8984",
warning="#F2C037",
)
column = ui.column()
content = Content()
drawer = Drawer(column, content.answer_selected, content.hide)
drawer.build()
@app.post("/answer")
async def post_answer(request: Request) -> PlainTextResponse:
import autopve.elements as el
from autopve import storage
from autopve.tabs import history
def response(answer: str, system_info: Dict[str, Any], data: Dict[str, Any]):
toml = tomlkit.dumps(data)
toml_fixed = ""
for line in toml.splitlines():
if len(line) > 0 and line[0] == '"':
line = line.replace('"', "", 2)
toml_fixed = toml_fixed + line + "\n"
r = history.Request(answer=answer, response=toml_fixed, system_info=system_info)
history.History.add_history(r)
for client in Client.instances.values():
if not client.has_socket_connection:
continue
with client:
el.Notification(f"New answer request from {r.name} served by {r.answer}!", type="positive", timeout=15)
return PlainTextResponse(toml_fixed)
system_info = await request.json()
system_info_raw = json.dumps(system_info)
default_data = storage.answer("Default", copy=True)
answers = list(storage.answers.keys())
if "Default" in answers:
answers.remove("Default")
for answer in answers:
answer_data = storage.answer(answer, copy=True)
match = False
if "must_contain" in answer_data:
for entry in answer_data["must_contain"]:
if len(entry) > 0 and entry in system_info_raw:
match = True
if "must_not_contain" in answer_data:
for entry in answer_data["must_not_contain"]:
if len(entry) > 0 and entry in system_info_raw:
match = False
if match is True:
if "global" in default_data and "global" in answer_data:
default_data["global"].update(answer_data["global"])
if "network" in default_data and "network" in answer_data:
default_data["network"].update(answer_data["network"])
if "disk-setup" in default_data and "disk-setup" in answer_data:
if any("filter" in k for k in answer_data["disk-setup"]) and "disk_list" in default_data["disk-setup"]:
del default_data["disk-setup"]["disk_list"]
if "disk_list" in answer_data["disk-setup"]:
for key in list(default_data["disk-setup"].keys()):
if "filter" in key:
del default_data["disk-setup"][key]
default_data["disk-setup"].update(answer_data["disk-setup"])
return response(answer, system_info, default_data)
return response("Default", system_info, default_data)
if __name__ in {"__main__", "__mp_main__"}:
from autopve import logo
app.on_startup(lambda: print(f"Starting autopve, bound to the following addresses {', '.join(app.urls)}.", flush=True))
page.build()
ui.run(title="autopve", favicon=logo.logo, dark=True, reload=False, show=False, show_welcome_message=False)

3
pyproject.toml Normal file
View File

@ -0,0 +1,3 @@
[tool.pytest.ini_options]
addopts = "--driver Chrome"
asyncio_mode = "auto"

55
requirements-test.txt Normal file
View File

@ -0,0 +1,55 @@
aiofiles==23.2.1
aiohttp==3.9.5
aiosignal==1.3.1
annotated-types==0.6.0
anyio==4.3.0
attrs==23.2.0
bidict==0.23.1
certifi==2024.2.2
charset-normalizer==3.3.2
click==8.1.7
docutils==0.19
fastapi==0.109.2
frozenlist==1.4.1
h11==0.14.0
httpcore==1.0.5
httptools==0.6.1
httpx==0.27.0
idna==3.7
ifaddr==0.2.0
itsdangerous==2.2.0
Jinja2==3.1.3
markdown2==2.4.13
MarkupSafe==2.1.5
multidict==6.0.5
nicegui==1.4.23
orjson==3.10.3
pscript==0.7.7
pydantic==2.7.1
pydantic_core==2.18.2
Pygments==2.17.2
python-dotenv==1.0.1
python-engineio==4.9.0
python-multipart==0.0.9
python-socketio==5.11.2
PyYAML==6.0.1
requests==2.31.0
simple-websocket==1.0.0
sniffio==1.3.1
starlette==0.36.3
tomlkit==0.12.4
typing_extensions==4.11.0
urllib3==2.2.1
uvicorn==0.29.0
uvloop==0.19.0
vbuild==0.8.2
watchfiles==0.21.0
websockets==12.0
wsproto==1.2.0
yarl==1.9.4
pytest
pytest-selenium
pytest-asyncio
selenium
icecream
beautifulsoup4

View File

@ -1,54 +1,46 @@
aiofiles==23.2.1
aiohttp==3.9.3
aiohttp==3.9.5
aiosignal==1.3.1
annotated-types==0.6.0
anyio==4.2.0
anyio==4.3.0
attrs==23.2.0
bidict==0.22.1
black==24.2.0
bidict==0.23.1
certifi==2024.2.2
charset-normalizer==3.3.2
click==8.1.7
docutils==0.19
fastapi==0.109.2
frozenlist==1.4.1
h11==0.14.0
httpcore==1.0.2
httpcore==1.0.5
httptools==0.6.1
httpx==0.26.0
idna==3.6
httpx==0.27.0
idna==3.7
ifaddr==0.2.0
itsdangerous==2.1.2
itsdangerous==2.2.0
Jinja2==3.1.3
markdown2==2.4.12
markdown2==2.4.13
MarkupSafe==2.1.5
multidict==6.0.5
mypy-extensions==1.0.0
nicegui==1.4.15
nicegui-highcharts==1.0.1
numpy==1.26.4
orjson==3.9.13
packaging==23.2
pathspec==0.12.1
platformdirs==4.2.0
nicegui==1.4.23
orjson==3.10.3
pscript==0.7.7
pydantic==2.6.1
pydantic_core==2.16.2
pydantic==2.7.1
pydantic_core==2.18.2
Pygments==2.17.2
python-dotenv==1.0.1
python-engineio==4.9.0
python-multipart==0.0.9
python-socketio==5.11.1
python-socketio==5.11.2
PyYAML==6.0.1
requests==2.31.0
scipy==1.12.0
simple-websocket==1.0.0
sniffio==1.3.0
sniffio==1.3.1
starlette==0.36.3
tomlkit==0.12.4
types-requests==2.31.0.20240125
typing_extensions==4.9.0
urllib3==2.2.0
uvicorn==0.27.1
typing_extensions==4.11.0
urllib3==2.2.1
uvicorn==0.29.0
uvloop==0.19.0
vbuild==0.8.2
watchfiles==0.21.0

52
tests/conftest.py Normal file
View File

@ -0,0 +1,52 @@
# from nicegui.testing.conftest import *
from collections.abc import Generator
import os
import pytest
from nicegui.testing import Screen
from nicegui.testing.conftest import (
capabilities, # noqa: F401
driver, # noqa: F401
remove_all_screenshots, # noqa: F401
reset_globals, # noqa: F401
DOWNLOAD_DIR,
)
from selenium import webdriver
@pytest.fixture
def chrome_options(chrome_options: webdriver.ChromeOptions) -> webdriver.ChromeOptions:
"""Configure the Chrome driver options."""
chrome_options.add_argument("disable-dev-shm-using")
chrome_options.add_argument("no-sandbox")
chrome_options.add_argument("headless")
# check if we are running on GitHub Actions
if "GITHUB_ACTIONS" in os.environ:
chrome_options.add_argument("disable-gpu")
else:
chrome_options.add_argument("--use-gl=angle")
chrome_options.add_argument("window-size=1920x1080")
chrome_options.add_experimental_option(
"prefs",
{
"download.default_directory": str(DOWNLOAD_DIR),
"download.prompt_for_download": False, # To auto download the file
"download.directory_upgrade": True,
},
)
if "CHROME_BINARY_LOCATION" in os.environ:
chrome_options.binary_location = os.environ["CHROME_BINARY_LOCATION"]
return chrome_options
@pytest.fixture
def screen(
driver: webdriver.Chrome, # noqa: F811
request: pytest.FixtureRequest,
caplog: pytest.LogCaptureFixture,
) -> Generator[Screen, None, None]:
"""Create a new Screen instance."""
screen_ = Screen(driver, caplog)
yield screen_
if screen_.is_open:
screen_.shot(request.node.name)
screen_.stop_server()

4
wait.py Normal file
View File

@ -0,0 +1,4 @@
import time
while True:
time.sleep(10)