Compare commits
35 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 140d1d803f | |||
| ef2ea6eedd | |||
| b790be713a | |||
| 6809b34b9d | |||
| 65bd553b09 | |||
| 00fe5bf24c | |||
| 4b6cfeddad | |||
| 6b0d81d4e9 | |||
| a3e07033e7 | |||
| 5a3bb026d4 | |||
| 7aa752c3d3 | |||
| 6b412140d4 | |||
| 61157457b8 | |||
| 46cae312c8 | |||
| aa04718374 | |||
| 4c6495c4ee | |||
| 8f00ab9570 | |||
| 32dec36cd7 | |||
| f9b5c0d153 | |||
| b3dc60d950 | |||
| f8e93cedf2 | |||
| 074975650f | |||
| 7f654071d0 | |||
| 0711b55ad8 | |||
| 779d25def5 | |||
| bba6410bbd | |||
| edc5987293 | |||
| 2b6f0e41e6 | |||
| 444c2d3182 | |||
| a069cfe410 | |||
| 005af9e3da | |||
| c927ef4c2f | |||
| 9cbfb0f25f | |||
| db3ed868e6 | |||
| 489ecbdeef |
27
.devcontainer/Dockerfile
Normal file
27
.devcontainer/Dockerfile
Normal file
@ -0,0 +1,27 @@
|
||||
FROM python:3.12.2-bookworm
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive \
|
||||
DISPLAY=:99 \
|
||||
NICEGUI_STORAGE_PATH=data
|
||||
|
||||
# Install packages
|
||||
RUN apt-get update && apt-get install --no-install-recommends -y \
|
||||
sudo git build-essential chromium chromium-driver python3-pip\
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Create remote user
|
||||
ARG USERNAME=vscode
|
||||
ARG USER_UID=1000
|
||||
ARG USER_GID=$USER_UID
|
||||
|
||||
RUN groupadd --gid $USER_GID $USERNAME \
|
||||
&& useradd --uid $USER_UID --gid $USER_GID -m $USERNAME \
|
||||
&& echo $USERNAME ALL=\(root\) NOPASSWD:ALL > /etc/sudoers.d/$USERNAME \
|
||||
&& chmod 0440 /etc/sudoers.d/$USERNAME
|
||||
|
||||
ENV PATH="/home/${USERNAME}/.local/bin:${PATH}"
|
||||
ENV CHROME_BINARY_LOCATION=/usr/bin/chromium
|
||||
|
||||
USER $USERNAME
|
||||
|
||||
CMD ["python", "wait.py"]
|
||||
38
.devcontainer/devcontainer.json
Normal file
38
.devcontainer/devcontainer.json
Normal file
@ -0,0 +1,38 @@
|
||||
// For format details, see https://aka.ms/devcontainer.json.
|
||||
{
|
||||
"name": "autopve-dev",
|
||||
"build": {
|
||||
"context": "..",
|
||||
"dockerfile": "Dockerfile"
|
||||
},
|
||||
"customizations": {
|
||||
"vscode": {
|
||||
"extensions": [
|
||||
"cschleiden.vscode-github-actions",
|
||||
"esbenp.prettier-vscode",
|
||||
"littlefoxteam.vscode-python-test-adapter",
|
||||
"ms-python.python",
|
||||
"samuelcolvin.jinjahtml",
|
||||
"Vue.volar"
|
||||
],
|
||||
"settings": {
|
||||
"python.testing.cwd": "/workspaces/autopve/",
|
||||
"python.testing.pytestArgs": ["tests"],
|
||||
"python.testing.unittestEnabled": false,
|
||||
"python.testing.pytestEnabled": true,
|
||||
"python.defaultInterpreterPath": "/usr/local/bin/python3",
|
||||
"terminal.integrated.defaultProfile.linux": "bash",
|
||||
"terminal.integrated.shell.linux": "bash",
|
||||
"terminal.integrated.profiles.linux": {
|
||||
"bash (container default)": {
|
||||
"path": "/usr/bin/bash",
|
||||
"overrideName": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
// More info: https://aka.ms/dev-containers-non-root.
|
||||
"remoteUser": "vscode",
|
||||
"postCreateCommand": "/usr/local/bin/python3 -m pip install -r requirements-test.txt"
|
||||
}
|
||||
@ -7,5 +7,6 @@ __pycache__/
|
||||
logs/
|
||||
notes/
|
||||
mpl/
|
||||
screenshots/
|
||||
mysecret.py
|
||||
test.py
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@ -168,4 +168,5 @@ notes/
|
||||
mpl/
|
||||
mysecret.py
|
||||
test.py
|
||||
mount/
|
||||
mount/
|
||||
screenshots/
|
||||
12
README.md
12
README.md
@ -1,11 +1,12 @@
|
||||
# autopve
|
||||
|
||||
## Demo
|
||||
[autopve_demo.webm](https://github.com/natankeddem/autopve/assets/44515217/1133abe2-97b4-421d-b33f-a833f8b6c904)
|
||||
|
||||
https://github.com/natankeddem/autopve/assets/44515217/9439e2e2-a7bf-4677-aea8-0684318bea6c
|
||||
|
||||
## Information
|
||||
|
||||
GUI configurable web server for Proxmox automated installation.
|
||||
GUI configurable web server for Proxmox automated installation. More information about the automated installation feature built into the Proxmox installation media can be found [here](https://pve.proxmox.com/wiki/Automated_Installation).
|
||||
|
||||
## Features
|
||||
|
||||
@ -44,3 +45,10 @@ GUI configurable web server for Proxmox automated installation.
|
||||
|
||||
The configuration GUI can be accessed at `http://host:8080`. Answers are hosted at `http://host:8080/answer`.
|
||||
|
||||
### OPNsense Setup
|
||||
|
||||
For Unbound you will need to enable TXT records and make an appropriate host override entry.
|
||||

|
||||

|
||||
|
||||
|
||||
|
||||
@ -43,6 +43,9 @@
|
||||
"terminal.integrated.enablePersistentSessions": true,
|
||||
"[json]": {
|
||||
"editor.defaultFormatter": "esbenp.prettier-vscode"
|
||||
},
|
||||
"[jsonc]": {
|
||||
"editor.defaultFormatter": "esbenp.prettier-vscode"
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,11 +1,10 @@
|
||||
import asyncio
|
||||
from nicegui import ui # type: ignore
|
||||
from autopve import elements as el
|
||||
import autopve.logo as logo
|
||||
from autopve.tabs import Tab
|
||||
from autopve import logo as logo
|
||||
from autopve.tabs.settings import Global, Network, Disk
|
||||
from autopve.tabs.history import History
|
||||
from autopve.tabs.system import System
|
||||
from autopve.tabs.system import MustContain, MustNotContain
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -42,7 +41,8 @@ class Content:
|
||||
self._tab["disk"] = ui.tab(name="Disk").classes("text-secondary")
|
||||
self._tab["history"] = ui.tab(name="History").classes("text-secondary")
|
||||
if self._answer != "Default":
|
||||
self._tab["system"] = ui.tab(name="System").classes("text-secondary")
|
||||
self._tab["must_contain"] = ui.tab(name="Must Contain").classes("text-secondary")
|
||||
self._tab["must_not_contain"] = ui.tab(name="Must Not Contain").classes("text-secondary")
|
||||
with ui.row().classes("items-center"):
|
||||
self._answer_display = ui.label(self._answer).classes("text-secondary text-h4")
|
||||
logo.show()
|
||||
@ -52,7 +52,11 @@ class Content:
|
||||
|
||||
async def _tab_changed(self, e):
|
||||
if e.value == "History":
|
||||
self._history.update_history()
|
||||
self._history.update()
|
||||
elif e.value == "Must Contain":
|
||||
self._must_contain.update()
|
||||
elif e.value == "Must Not Contain":
|
||||
self._must_not_contain.update()
|
||||
|
||||
def _build_tab_panels(self):
|
||||
self._tab_panels.clear()
|
||||
@ -62,7 +66,8 @@ class Content:
|
||||
self._disk_content = el.ContentTabPanel(self._tab["disk"])
|
||||
self._history_content = el.ContentTabPanel(self._tab["history"])
|
||||
if self._answer != "Default":
|
||||
self._system_content = el.ContentTabPanel(self._tab["system"])
|
||||
self._must_contain_content = el.ContentTabPanel(self._tab["must_contain"])
|
||||
self._must_not_contain_content = el.ContentTabPanel(self._tab["must_not_contain"])
|
||||
with self._global_content:
|
||||
self._global = Global(answer=self._answer)
|
||||
with self._network_content:
|
||||
@ -72,8 +77,10 @@ class Content:
|
||||
with self._history_content:
|
||||
self._history = History(answer=self._answer)
|
||||
if self._answer != "Default":
|
||||
with self._system_content:
|
||||
self._system = System(answer=self._answer)
|
||||
with self._must_contain_content:
|
||||
self._must_contain = MustContain(answer=self._answer)
|
||||
with self._must_not_contain_content:
|
||||
self._must_not_contain = MustNotContain(answer=self._answer)
|
||||
|
||||
async def answer_selected(self, name):
|
||||
self._answer = name
|
||||
|
||||
@ -1,9 +1,8 @@
|
||||
from typing import Optional
|
||||
from nicegui.events import KeyEventArguments
|
||||
from nicegui import ui # type: ignore
|
||||
from autopve import elements as el
|
||||
from autopve import storage
|
||||
from autopve.tabs import Tab
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -45,6 +44,7 @@ class Drawer(object):
|
||||
el.IButton(icon="add", on_click=self._display_answer_dialog)
|
||||
self._buttons["remove"] = el.IButton(icon="remove", on_click=lambda: self._modify_answer("remove"))
|
||||
self._buttons["edit"] = el.IButton(icon="edit", on_click=lambda: self._modify_answer("edit"))
|
||||
self._buttons["content_copy"] = el.IButton(icon="content_copy", on_click=lambda: self._modify_answer("content_copy"))
|
||||
ui.label(text="ANSWERS").classes("text-secondary")
|
||||
self._table = (
|
||||
ui.table(
|
||||
@ -84,11 +84,11 @@ class Drawer(object):
|
||||
self._table.add_rows({"name": name})
|
||||
self._table.visible = True
|
||||
|
||||
async def _display_answer_dialog(self, name=""):
|
||||
async def _display_answer_dialog(self, name="", copy=False):
|
||||
save = None
|
||||
|
||||
with ui.dialog() as answer_dialog, el.Card():
|
||||
with el.DBody(height="[95vh]", width="[360px]"):
|
||||
with el.DBody(height="fit", width="[320px]"):
|
||||
with el.WColumn():
|
||||
all_answers = list(storage.answers.keys())
|
||||
for answer in list(storage.answers.keys()):
|
||||
@ -101,26 +101,38 @@ class Drawer(object):
|
||||
|
||||
def answer_check(value: str) -> Optional[bool]:
|
||||
spaceless = value.replace(" ", "")
|
||||
if len(spaceless) == 0:
|
||||
return False
|
||||
for invalid_value in all_answers:
|
||||
if invalid_value == spaceless:
|
||||
return False
|
||||
return None
|
||||
|
||||
def enter_submit(e: KeyEventArguments) -> None:
|
||||
if e.key == "Enter" and save_ea.no_errors is True:
|
||||
answer_dialog.submit("save")
|
||||
elif e.key == "Escape":
|
||||
answer_dialog.close()
|
||||
|
||||
answer_input = el.VInput(label="answer", value=" ", invalid_characters="""'`"$\\;&<>|(){}""", invalid_values=all_answers, check=answer_check, max_length=20)
|
||||
save_ea = el.ErrorAggregator(answer_input)
|
||||
el.DButton("SAVE", on_click=lambda: answer_dialog.submit("save")).bind_enabled_from(save_ea, "no_errors")
|
||||
answer_input.value = name
|
||||
ui.keyboard(on_key=enter_submit, ignore=[])
|
||||
answer_input.value = name
|
||||
|
||||
result = await answer_dialog
|
||||
if result == "save":
|
||||
answer = answer_input.value.strip()
|
||||
if len(answer) > 0 and name != "Default":
|
||||
if name in storage.answers:
|
||||
answer = answer_input.value.strip()
|
||||
if result == "save" and name != answer:
|
||||
if name in storage.answers:
|
||||
storage.answers[answer] = storage.answer(name, copy=True)
|
||||
if copy is False:
|
||||
del storage.answers[name]
|
||||
for row in self._table.rows:
|
||||
if name == row["name"]:
|
||||
self._table.remove_rows(row)
|
||||
self._add_answer_to_table(answer)
|
||||
for row in self._table.rows:
|
||||
if name == row["name"]:
|
||||
self._table.remove_rows(row)
|
||||
else:
|
||||
storage.answer(answer)
|
||||
self._add_answer_to_table(answer)
|
||||
|
||||
def _modify_answer(self, mode):
|
||||
self._hide_content()
|
||||
@ -146,17 +158,20 @@ class Drawer(object):
|
||||
|
||||
async def _selected(self, e):
|
||||
self._hide_content()
|
||||
if self._selection_mode == "edit":
|
||||
if len(e.selection) > 0 and e.selection[0]["name"] != "Default":
|
||||
await self._display_answer_dialog(name=e.selection[0]["name"])
|
||||
if self._selection_mode == "remove":
|
||||
if len(e.selection) > 0:
|
||||
for row in e.selection:
|
||||
if row["name"] != "Default":
|
||||
if row["name"] in storage.answers:
|
||||
del storage.answers[row["name"]]
|
||||
self._table.remove_rows(row)
|
||||
self._modify_answer(None)
|
||||
if len(e.selection) == 1:
|
||||
answer = e.selection[0]["name"]
|
||||
if self._selection_mode == "content_copy":
|
||||
await self._display_answer_dialog(name=answer, copy=True)
|
||||
self._modify_answer(None)
|
||||
elif answer == "Default":
|
||||
self._table._props["selected"] = []
|
||||
elif self._selection_mode == "edit":
|
||||
await self._display_answer_dialog(name=answer)
|
||||
self._modify_answer(None)
|
||||
elif self._selection_mode == "remove":
|
||||
if answer in storage.answers:
|
||||
del storage.answers[answer]
|
||||
self._table.remove_rows(e.selection[0])
|
||||
|
||||
async def _clicked(self, e):
|
||||
if "name" in e.args[1]:
|
||||
|
||||
@ -364,7 +364,7 @@ class FSelect(ui.select):
|
||||
clearable: bool = False,
|
||||
) -> None:
|
||||
super().__init__(options, label=label, value=value, on_change=on_change, with_input=with_input, new_value_mode=new_value_mode, multiple=multiple, clearable=clearable)
|
||||
self.tailwind.width("64")
|
||||
self.tailwind.width("[320px]")
|
||||
|
||||
|
||||
class JsonEditor(ui.json_editor):
|
||||
|
||||
@ -1,74 +0,0 @@
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
import asyncio
|
||||
import copy
|
||||
import json
|
||||
import tomlkit
|
||||
from fastapi import Request
|
||||
from fastapi.responses import PlainTextResponse
|
||||
from nicegui import app, Client, ui # type: ignore
|
||||
from autopve import elements as el
|
||||
from autopve.drawer import Drawer
|
||||
from autopve.content import Content
|
||||
from autopve import storage
|
||||
import autopve.tabs.history as history
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def build():
|
||||
@app.post("/answer")
|
||||
async def post_answer(request: Request) -> PlainTextResponse:
|
||||
def response(answer: str, system_info: Dict[str, Any], data: Dict[str, Any]):
|
||||
toml = tomlkit.dumps(default_data)
|
||||
toml_fixed = ""
|
||||
for line in toml.splitlines():
|
||||
if len(line) > 0 and line[0] == '"':
|
||||
line = line.replace('"', "", 2)
|
||||
toml_fixed = toml_fixed + line + "\n"
|
||||
r = history.Request(answer=answer, response=toml_fixed, system_info=dict(system_info))
|
||||
history.History.add_history(r)
|
||||
for client in Client.instances.values():
|
||||
if not client.has_socket_connection:
|
||||
continue
|
||||
with client:
|
||||
el.Notification(f"New answer request from {r.name} served by {r.answer}!", type="positive", timeout=30)
|
||||
return PlainTextResponse(toml_fixed)
|
||||
|
||||
system_info = await request.json()
|
||||
system_info_raw = json.dumps(system_info)
|
||||
default_data = dict(storage.answer("Default"))
|
||||
answers = list(storage.answers.keys())
|
||||
if "Default" in answers:
|
||||
answers.remove("Default")
|
||||
for answer in answers:
|
||||
answer_data = dict(storage.answer(answer))
|
||||
if "match" in answer_data:
|
||||
if len(answer_data["match"]) > 0 and answer_data["match"] in system_info_raw:
|
||||
if "global" in default_data and "global" in answer_data:
|
||||
default_data["global"].update(answer_data["global"])
|
||||
if "network" in default_data and "network" in answer_data:
|
||||
default_data["network"].update(answer_data["network"])
|
||||
if "disk-setup" in default_data and "disk-setup" in answer_data:
|
||||
default_data["disk-setup"].update(answer_data["disk-setup"])
|
||||
return response(answer, system_info, default_data)
|
||||
return response("Default", system_info, default_data)
|
||||
|
||||
@ui.page("/", response_timeout=30)
|
||||
async def index(client: Client) -> None:
|
||||
app.add_static_files("/static", "static")
|
||||
el.load_element_css()
|
||||
ui.colors(
|
||||
primary=el.orange,
|
||||
secondary=el.orange,
|
||||
accent=el.orange,
|
||||
dark=el.dark,
|
||||
positive="#21BA45",
|
||||
negative="#C10015",
|
||||
info="#5C8984",
|
||||
warning="#F2C037",
|
||||
)
|
||||
column = ui.column()
|
||||
content = Content()
|
||||
drawer = Drawer(column, content.answer_selected, content.hide)
|
||||
drawer.build()
|
||||
@ -1,11 +1,10 @@
|
||||
from typing import Any, Dict, Literal
|
||||
from typing import Any, Dict
|
||||
import json
|
||||
from nicegui import app
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
configs_version = int(102)
|
||||
configs_version = int(100)
|
||||
configs_version_string = f"config_{configs_version}"
|
||||
root = app.storage.general.get(configs_version_string, None)
|
||||
if root is None:
|
||||
@ -35,73 +34,10 @@ if "Default" not in answers:
|
||||
}
|
||||
|
||||
|
||||
def answer(name: str) -> dict:
|
||||
def answer(name: str, copy: bool = False) -> dict:
|
||||
if name not in answers:
|
||||
answers[name] = {}
|
||||
return answers[name]
|
||||
|
||||
|
||||
# def algo(answer_name: str) -> dict:
|
||||
# h = answer(answer_name)
|
||||
# if "algo" not in h:
|
||||
# h["algo"] = {}
|
||||
# return h["algo"]
|
||||
|
||||
|
||||
# def algo_sensor(answer_name: str, sensor: str) -> dict:
|
||||
# a = algo(answer_name)
|
||||
# if sensor not in a:
|
||||
# a[sensor] = {}
|
||||
# if "type" not in a[sensor]:
|
||||
# a[sensor]["type"] = "curve"
|
||||
# return a[sensor]
|
||||
|
||||
|
||||
# def curve(answer_name: str, sensor: str) -> dict:
|
||||
# s = algo_sensor(answer_name, sensor)
|
||||
# if "curve" not in s:
|
||||
# s["curve"] = {}
|
||||
# return s["curve"]
|
||||
|
||||
|
||||
# def curve_speed(answer_name: str, sensor: str, default=None) -> dict:
|
||||
# c = curve(answer_name, sensor)
|
||||
# if "speed" not in c:
|
||||
# if default is None:
|
||||
# c["speed"] = {
|
||||
# "Min": None,
|
||||
# "Low": None,
|
||||
# "Medium": None,
|
||||
# "High": None,
|
||||
# "Max": None,
|
||||
# }
|
||||
# else:
|
||||
# c["speed"] = default
|
||||
# return c["speed"]
|
||||
|
||||
|
||||
# def curve_temp(answer_name: str, sensor: str, default=None) -> dict:
|
||||
# c = curve(answer_name, sensor)
|
||||
# if "temp" not in c:
|
||||
# if default is None:
|
||||
# c["temp"] = {
|
||||
# "Min": 30,
|
||||
# "Low": 40,
|
||||
# "Medium": 50,
|
||||
# "High": 60,
|
||||
# "Max": 70,
|
||||
# }
|
||||
# else:
|
||||
# c["temp"] = default
|
||||
# return c["temp"]
|
||||
|
||||
|
||||
# def pid(answer_name: str, sensor: str) -> Dict[str, float]:
|
||||
# s = algo_sensor(answer_name, sensor)
|
||||
# if "pid" not in s:
|
||||
# s["pid"] = {"Kp": 5, "Ki": 0.01, "Kd": 0.1, "Target": 40}
|
||||
# return s["pid"]
|
||||
|
||||
|
||||
# def pid_coefficient(answer_name: str, sensor: str, coefficient: Literal["Kp", "Ki", "Kd", "Target"]) -> float:
|
||||
# return pid(answer_name, sensor)[coefficient]
|
||||
if copy is False:
|
||||
return answers[name]
|
||||
else:
|
||||
return json.loads(json.dumps(answers[name]))
|
||||
|
||||
@ -1,104 +1,25 @@
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from dataclasses import dataclass, field
|
||||
from nicegui import app, ui # type: ignore
|
||||
import autopve.elements as el
|
||||
from autopve import storage
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(kw_only=True)
|
||||
class Share:
|
||||
history: List[Dict[str, Any]] = field(default_factory=list)
|
||||
last_timestamp: float = 0
|
||||
unique_system_information: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
class Tab:
|
||||
def __init__(self, answer: Optional[str] = None, table: Optional[str] = None) -> None:
|
||||
self.answer: Optional[str] = answer
|
||||
self.table: Optional[str] = table
|
||||
self.picked_keys: Dict["str", Any] = {}
|
||||
_share: Share = Share()
|
||||
|
||||
def __init__(self, answer: str, type: Optional[str] = None) -> None:
|
||||
self.answer: str = answer
|
||||
self.type: Optional[str] = type
|
||||
self._elements: Dict[str, Any] = {}
|
||||
self._build()
|
||||
|
||||
def _build(self):
|
||||
pass
|
||||
|
||||
def key_picker(self, keys: Dict[str, Any]):
|
||||
def keys_controls():
|
||||
with ui.column() as col:
|
||||
col.tailwind.width("[560px]").align_items("center")
|
||||
with ui.card() as card:
|
||||
card.tailwind.width("full")
|
||||
key_select = ui.select(list(keys.keys()), label="key", new_value_mode="add-unique", with_input=True)
|
||||
key_select.tailwind.width("full")
|
||||
with ui.row() as row:
|
||||
row.tailwind.width("full").align_items("center").justify_content("between")
|
||||
with ui.row() as row:
|
||||
row.tailwind.align_items("center")
|
||||
self.current_help = None
|
||||
self.current_key = el.FInput(label="key", on_change=lambda e: key_changed(e), read_only=True)
|
||||
self.current_key.bind_value_from(key_select)
|
||||
with ui.button(icon="help"):
|
||||
self.current_help = ui.tooltip("NA")
|
||||
ui.button(icon="add", on_click=lambda: add_key(self.current_key.value))
|
||||
ui.separator()
|
||||
self.keys_scroll = ui.scroll_area()
|
||||
self.keys_scroll.tailwind.width("full").height("[480px]")
|
||||
self.key_controls = {}
|
||||
items = storage.answer(self.answer)
|
||||
if self.table is not None and self.table in items:
|
||||
for key, value in items[self.table].items():
|
||||
if isinstance(value, list):
|
||||
add_key(key, "[" + ",".join(str(v) for v in value) + "]")
|
||||
else:
|
||||
add_key(key, str(value))
|
||||
|
||||
def add_key(key, value=""):
|
||||
if key is not None and key != "" and key not in self.picked_keys:
|
||||
with self.keys_scroll:
|
||||
with ui.row() as key_row:
|
||||
key_row.tailwind.width("full").align_items("center").justify_content("between")
|
||||
with ui.row() as row:
|
||||
row.tailwind.align_items("center")
|
||||
self.picked_keys[key] = value
|
||||
self.key_controls[key] = {
|
||||
"control": el.FInput(
|
||||
key,
|
||||
password=True if key == "root_password" else False,
|
||||
autocomplete=keys[key]["options"] if "options" in keys[key] else None,
|
||||
on_change=lambda e, key=key: set_key(key, e.value),
|
||||
),
|
||||
"row": key_row,
|
||||
}
|
||||
self.key_controls[key]["control"].value = value
|
||||
if key in keys:
|
||||
with ui.button(icon="help"):
|
||||
ui.tooltip(keys[key]["description"])
|
||||
ui.button(icon="remove", on_click=lambda _, key=key: remove_key(key))
|
||||
|
||||
def remove_key(key):
|
||||
self.keys_scroll.remove(self.key_controls[key]["row"])
|
||||
del self.picked_keys[key]
|
||||
del self.key_controls[key]
|
||||
|
||||
def set_key(key, value: str):
|
||||
if len(value) > 0:
|
||||
if "type" in keys[key]:
|
||||
if keys[key]["type"] == "list":
|
||||
self.picked_keys[key] = value[1:-1].split(",")
|
||||
elif keys[key]["type"] == "int":
|
||||
self.picked_keys[key] = int(value)
|
||||
else:
|
||||
self.picked_keys[key] = value
|
||||
else:
|
||||
if len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]":
|
||||
self.picked_keys[key] = value[1:-1].split(",")
|
||||
elif value.isnumeric():
|
||||
self.picked_keys[key] = int(value)
|
||||
else:
|
||||
self.picked_keys[key] = value
|
||||
storage.answer(self.answer)[self.table] = self.picked_keys
|
||||
|
||||
def key_changed(e):
|
||||
if self.current_help is not None:
|
||||
if e.value in keys:
|
||||
self.current_help.text = keys[e.value]["description"]
|
||||
else:
|
||||
self.current_help.text = "NA"
|
||||
|
||||
keys_controls()
|
||||
|
||||
@ -2,9 +2,11 @@ import asyncio
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from dataclasses import dataclass, field
|
||||
import time
|
||||
import json
|
||||
import re
|
||||
from nicegui import app, ui # type: ignore
|
||||
from . import Tab
|
||||
import autopve.elements as el
|
||||
from autopve import elements as el
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -32,7 +34,7 @@ class SelectionConfirm:
|
||||
self._label = label
|
||||
self._visible = None
|
||||
self._request = None
|
||||
self._submitted = None
|
||||
self._submitted: Optional[asyncio.Event] = None
|
||||
with self._container:
|
||||
self._label = ui.label(self._label).tailwind().text_color("primary")
|
||||
self._done = el.IButton(icon="done", on_click=lambda: self.submit("confirm"))
|
||||
@ -66,8 +68,6 @@ class SelectionConfirm:
|
||||
|
||||
|
||||
class History(Tab):
|
||||
_history: List[Dict[str, Any]] = []
|
||||
|
||||
def _build(self):
|
||||
async def display_request(e):
|
||||
if e.args["data"]["system_info"] is not None and e.args["data"]["response"] is not None:
|
||||
@ -84,6 +84,15 @@ class History(Tab):
|
||||
el.JsonEditor(properties=properties)
|
||||
with ui.tab_panel(response_tab):
|
||||
response = e.args["data"]["response"]
|
||||
lines = response.splitlines()
|
||||
response_lines = []
|
||||
for line in lines:
|
||||
if line.strip().startswith("root_password"):
|
||||
response_lines.append('root_password = "SECRET"')
|
||||
else:
|
||||
response_lines.append(line)
|
||||
|
||||
response = "\n".join(response_lines)
|
||||
ui.code(response).tailwind.height("[320px]").width("[640px]")
|
||||
|
||||
with el.WRow() as row:
|
||||
@ -137,7 +146,7 @@ class History(Tab):
|
||||
"maxWidth": 200,
|
||||
},
|
||||
],
|
||||
"rowData": self._history,
|
||||
"rowData": self._share.history,
|
||||
},
|
||||
theme="balham-dark",
|
||||
)
|
||||
@ -160,14 +169,27 @@ class History(Tab):
|
||||
self._grid.options["rowSelection"] = row_selection
|
||||
self._grid.update()
|
||||
|
||||
def update_history(self):
|
||||
def update(self):
|
||||
self._grid.update()
|
||||
|
||||
@classmethod
|
||||
def add_history(cls, request: Request) -> None:
|
||||
if len(cls._history) > 1000:
|
||||
cls._history.pop(0)
|
||||
cls._history.append({"timestamp": request.timestamp, "name": request.name, "answer": request.answer, "response": request.response, "system_info": request.system_info})
|
||||
if len(cls._share.history) > 1000:
|
||||
cls._share.history.pop(0)
|
||||
cls._share.history.append(
|
||||
{
|
||||
"timestamp": request.timestamp,
|
||||
"name": request.name,
|
||||
"answer": request.answer,
|
||||
"response": request.response,
|
||||
"system_info": request.system_info,
|
||||
}
|
||||
)
|
||||
cls._share.last_timestamp = request.timestamp
|
||||
matches = re.findall(r"(\"[^\"]+\"\s*:\s*(\"[^\"]+\"|\d+|true|false))", json.dumps(request.system_info))
|
||||
for match in matches:
|
||||
if str(match[0]) not in cls._share.unique_system_information:
|
||||
cls._share.unique_system_information.append(str(match[0]))
|
||||
|
||||
async def _remove_history(self):
|
||||
self._set_selection(mode="multiple")
|
||||
@ -175,6 +197,6 @@ class History(Tab):
|
||||
if request == "confirm":
|
||||
rows = await self._grid.get_selected_rows()
|
||||
for row in rows:
|
||||
self._history.remove(row)
|
||||
self._share.history.remove(row)
|
||||
self._grid.update()
|
||||
self._set_selection()
|
||||
|
||||
@ -1,51 +1,191 @@
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from dataclasses import dataclass, field
|
||||
from nicegui import app, ui # type: ignore
|
||||
from typing import Any, Dict, Optional
|
||||
from nicegui import ui
|
||||
from . import Tab
|
||||
import autopve.elements as el
|
||||
from autopve import elements as el
|
||||
from autopve import storage
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Global(Tab):
|
||||
def __init__(self, answer: Optional[str] = None) -> None:
|
||||
self.keys = {
|
||||
"keyboard": {"description": "The keyboard layout with the following possible options"},
|
||||
class Setting(Tab):
|
||||
def __init__(self, answer: str, type: Optional[str] = None, keys: Dict[str, Dict[str, Any]] = {}) -> None:
|
||||
self.keys: Dict[str, Dict[str, Any]] = keys
|
||||
super().__init__(answer, type=type)
|
||||
|
||||
def _build(self):
|
||||
self.keys_controls()
|
||||
|
||||
def keys_controls(self):
|
||||
with ui.column() as col:
|
||||
col.tailwind.width("[560px]").align_items("center")
|
||||
with ui.card() as card:
|
||||
card.tailwind.width("full")
|
||||
key_select = ui.select(list(self.keys.keys()), label="key", new_value_mode="add", with_input=True)
|
||||
key_select.tailwind.width("full")
|
||||
with ui.row() as row:
|
||||
row.tailwind.width("full").align_items("center").justify_content("between")
|
||||
with ui.row() as row:
|
||||
row.tailwind.align_items("center")
|
||||
self.help = None
|
||||
key = el.FInput(label="key", on_change=lambda e: self.key_changed(e.value), read_only=True)
|
||||
key.bind_value_from(key_select)
|
||||
with ui.button(icon="help"):
|
||||
self.help = ui.tooltip("NA")
|
||||
ui.button(icon="add", on_click=lambda key=key: self.add_key(key.value))
|
||||
ui.separator()
|
||||
self._scroll = ui.scroll_area()
|
||||
self._scroll.tailwind.width("full").height("[480px]")
|
||||
items = storage.answer(self.answer)
|
||||
if self.type is not None and self.type in items:
|
||||
for key, value in items[self.type].items():
|
||||
if isinstance(value, list):
|
||||
self.add_key(key, "[" + ",".join(str(v) for v in value) + "]")
|
||||
else:
|
||||
self.add_key(key, str(value))
|
||||
|
||||
def add_key(self, key: str, value: str = ""):
|
||||
if self.key_valid(key) is True:
|
||||
with self._scroll:
|
||||
with ui.row() as key_row:
|
||||
key_row.tailwind.width("full").align_items("center").justify_content("between")
|
||||
with ui.row() as row:
|
||||
row.tailwind.align_items("center")
|
||||
if key in self.keys and "options" in self.keys[key]:
|
||||
options = self.keys[key]["options"]
|
||||
if value != "" and value not in self.keys[key]["options"]:
|
||||
options.append(value)
|
||||
control = el.FSelect(
|
||||
label=key,
|
||||
options=options,
|
||||
with_input=True,
|
||||
new_value_mode="add-unique",
|
||||
on_change=lambda e, key=key: self.set_key(key, e.value) if e.value is not None else None,
|
||||
)
|
||||
else:
|
||||
control = el.FInput(
|
||||
label=key,
|
||||
password=True if key == "root_password" else False,
|
||||
password_toggle_button=False,
|
||||
on_change=lambda e, key=key: self.set_key(key, e.value),
|
||||
)
|
||||
self._elements[key] = {
|
||||
"control": control,
|
||||
"row": key_row,
|
||||
}
|
||||
if isinstance(control, el.FSelect):
|
||||
control.value = self.keys[key]["options"][0] if value == "" else value
|
||||
else:
|
||||
control.value = value
|
||||
if key in self.keys:
|
||||
with ui.button(icon="help"):
|
||||
ui.tooltip(self.keys[key]["description"])
|
||||
ui.button(icon="remove", on_click=lambda _, key=key: self.remove_key(key))
|
||||
|
||||
def key_valid(self, key: str) -> bool:
|
||||
if key is not None and key != "" and key not in self._elements.keys():
|
||||
return True
|
||||
return False
|
||||
|
||||
def remove_key(self, key: str):
|
||||
self._scroll.remove(self._elements[key]["row"])
|
||||
del self._elements[key]
|
||||
if key in storage.answer(self.answer)[self.type]:
|
||||
del storage.answer(self.answer)[self.type][key]
|
||||
|
||||
def set_key(self, key: str, value: str):
|
||||
v: Any = ""
|
||||
if len(value) > 0:
|
||||
if key in self.keys and "type" in self.keys[key]:
|
||||
if self.keys[key]["type"] == "list" and len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]":
|
||||
l = value.strip()[1:-1].replace('"', "").replace("'", "").split(",")
|
||||
v = [v.strip() for v in l]
|
||||
elif self.keys[key]["type"] == "int":
|
||||
v = int(value)
|
||||
else:
|
||||
v = value
|
||||
else:
|
||||
if len(value) > 2 and value.strip()[0] == "[" and value.strip()[-1] == "]":
|
||||
l = value.strip()[1:-1].replace('"', "").replace("'", "").split(",")
|
||||
v = [v.strip() for v in l]
|
||||
elif value.isnumeric():
|
||||
v = int(value)
|
||||
else:
|
||||
v = value
|
||||
if self.type not in storage.answer(self.answer):
|
||||
storage.answer(self.answer)[self.type] = {}
|
||||
storage.answer(self.answer)[self.type][key] = v
|
||||
|
||||
def key_changed(self, value: str):
|
||||
if self.help is not None:
|
||||
if value in self.keys:
|
||||
self.help.text = self.keys[value]["description"]
|
||||
else:
|
||||
self.help.text = "NA"
|
||||
|
||||
|
||||
class Global(Setting):
|
||||
def __init__(self, answer: str) -> None:
|
||||
keys = {
|
||||
"keyboard": {
|
||||
"description": "The keyboard layout with the following possible options",
|
||||
"options": [
|
||||
"de",
|
||||
"de-ch",
|
||||
"dk",
|
||||
"en-gb",
|
||||
"en-us",
|
||||
"es",
|
||||
"fi",
|
||||
"fr",
|
||||
"fr-be",
|
||||
"fr-ca",
|
||||
"fr-ch",
|
||||
"hu",
|
||||
"is",
|
||||
"it",
|
||||
"jp",
|
||||
"lt",
|
||||
"mk",
|
||||
"nl",
|
||||
"no",
|
||||
"pl",
|
||||
"pt",
|
||||
"pt-br",
|
||||
"se",
|
||||
"si",
|
||||
"tr",
|
||||
],
|
||||
},
|
||||
"country": {"description": "The country code in the two letter variant. For example, at, us or fr."},
|
||||
"fqdn": {"description": "The fully qualified domain name of the host. The domain part will be used as the search domain."},
|
||||
"mailto": {"description": "The default email address for the user root."},
|
||||
"timezone": {"description": "The timezone in tzdata format. For example, Europe/Vienna or America/New_York."},
|
||||
"root_password": {"description": "The password for the root user.", "type": "str"},
|
||||
"root-password-hashed": {"description": "The pre-hashed password for the root user, which will be written verbatim to /etc/passwd. May be used instead of root_password and can be generated using the mkpasswd tool, for example.", "type": "str"},
|
||||
"root_ssh_keys": {"description": "Optional. SSH public keys to add to the root users authorized_keys file after the installation."},
|
||||
"reboot_on_error": {
|
||||
"description": "If set to true, the installer will reboot automatically when an error is encountered. The default behavior is to wait to give the administrator a chance to investigate why the installation failed."
|
||||
},
|
||||
}
|
||||
super().__init__(answer=answer, table="global")
|
||||
|
||||
def _build(self):
|
||||
self.key_picker(keys=self.keys)
|
||||
super().__init__(answer, type="global", keys=keys)
|
||||
|
||||
|
||||
class Network(Tab):
|
||||
def __init__(self, answer: Optional[str] = None) -> None:
|
||||
self.keys = {
|
||||
"source": {"description": "Where to source the static network configuration from. This can be from-dhcp or from-answer."},
|
||||
class Network(Setting):
|
||||
def __init__(self, answer: str) -> None:
|
||||
keys = {
|
||||
"source": {"description": "Where to source the static network configuration from. This can be from-dhcp or from-answer.", "options": ["from-dhcp", "from-answer"]},
|
||||
"cidr": {"description": "The IP address in CIDR notation. For example, 192.168.1.10/24."},
|
||||
"dns": {"description": "The IP address of the DNS server."},
|
||||
"gateway": {"description": "The IP address of the default gateway."},
|
||||
"filter": {"description": "Filter against the UDEV properties to select the network card. See filters."},
|
||||
}
|
||||
super().__init__(answer=answer, table="network")
|
||||
|
||||
def _build(self):
|
||||
self.key_picker(keys=self.keys)
|
||||
super().__init__(answer, type="network", keys=keys)
|
||||
|
||||
|
||||
class Disk(Tab):
|
||||
def __init__(self, answer: Optional[str] = None) -> None:
|
||||
self.keys = {
|
||||
class Disk(Setting):
|
||||
def __init__(self, answer: str) -> None:
|
||||
keys = {
|
||||
"filesystem": {"description": "One of the following options: ext4, xfs, zfs, or btrfs.", "options": ["ext4", "xfs", "zfs", "btrfs"]},
|
||||
"disk_list": {"description": 'List of disks to use. Useful if you are sure about the disk names. For example: disk_list = ["sda", "sdb"].'},
|
||||
"filter": {"description": "Filter against UDEV properties to select the disks for the installation. See filters."},
|
||||
@ -57,24 +197,50 @@ class Disk(Tab):
|
||||
"description": "The RAID level that should be used. Options are raid0, raid1, raid10, raidz-1, raidz-2, or raidz-3.",
|
||||
"options": ["raid0", "raid1", "raid10", "raidz-1", "raidz-2", "raidz-3"],
|
||||
},
|
||||
"zfs.ashift": {"description": ""},
|
||||
"zfs.arc_max": {"description": ""},
|
||||
"zfs.checksum": {"description": ""},
|
||||
"zfs.compress": {"description": ""},
|
||||
"zfs.copies": {"description": ""},
|
||||
"zfs.hdsize": {"description": ""},
|
||||
"lvm.hdsize": {"description": ""},
|
||||
"lvm.swapsize": {"description": ""},
|
||||
"lvm.maxroot": {"description": ""},
|
||||
"lvm.maxvz": {"description": ""},
|
||||
"lvm.minfree": {"description": ""},
|
||||
"zfs.ashift": {
|
||||
"description": "Defines the ashift value for the created pool. The ashift needs to be set at least to the sector-size of the underlying disks (2 to the power of ashift is the sector-size), or any disk which might be put in the pool (for example the replacement of a defective disk)."
|
||||
},
|
||||
"zfs.arc_max": {
|
||||
"description": "Defines the maximum size the ARC can grow to and thus limits the amount of memory ZFS will use. See also the section on how to limit ZFS memory usage for more details."
|
||||
},
|
||||
"zfs.checksum": {"description": "Defines which checksumming algorithm should be used for rpool."},
|
||||
"zfs.compress": {"description": "Defines whether compression is enabled for rpool."},
|
||||
"zfs.copies": {
|
||||
"description": "Defines the copies parameter for rpool. Check the zfs(8) manpage for the semantics, and why this does not replace redundancy on disk-level."
|
||||
},
|
||||
"zfs.hdsize": {
|
||||
"description": "Defines the total hard disk size to be used. This is useful to save free space on the hard disk(s) for further partitioning (for example to create a swap-partition). hdsize is only honored for bootable disks, that is only the first disk or mirror for RAID0, RAID1 or RAID10, and all disks in RAID-Z[123]."
|
||||
},
|
||||
"lvm.hdsize": {
|
||||
"description": "Defines the total hard disk size to be used. This way you can reserve free space on the hard disk for further partitioning (for example for an additional PV and VG on the same hard disk that can be used for LVM storage)."
|
||||
},
|
||||
"lvm.swapsize": {
|
||||
"description": "Defines the size of the swap volume. The default is the size of the installed memory, minimum 4 GB and maximum 8 GB. The resulting value cannot be greater than hdsize/8."
|
||||
},
|
||||
"lvm.maxroot": {
|
||||
"description": "Defines the maximum size of the root volume, which stores the operation system. The maximum limit of the root volume size is hdsize/4."
|
||||
},
|
||||
"lvm.maxvz": {
|
||||
"description": "Defines the maximum size of the data volume. The actual size of the data volume is: datasize = hdsize - rootsize - swapsize - minfree Where datasize cannot be bigger than maxvz."
|
||||
},
|
||||
"lvm.minfree": {
|
||||
"description": "Defines the amount of free space that should be left in the LVM volume group pve. With more than 128GB storage available, the default is 16GB, otherwise hdsize/8 will be used."
|
||||
},
|
||||
"btrfs.raid": {
|
||||
"description": "",
|
||||
"description": "The RAID level that should be used. Options are raid0, raid1, and raid10",
|
||||
"options": ["raid0", "raid1", "raid10"],
|
||||
},
|
||||
"btrfs.hdsize": {"description": ""},
|
||||
}
|
||||
super().__init__(answer=answer, table="disk-setup")
|
||||
super().__init__(answer, type="disk-setup", keys=keys)
|
||||
|
||||
def _build(self):
|
||||
self.key_picker(keys=self.keys)
|
||||
def key_valid(self, key: str) -> bool:
|
||||
if super().key_valid(key) is True:
|
||||
if "filter" in key and "disk_list" in self._elements.keys():
|
||||
el.Notification(f"Can not add {key} when disk_list is utilized!", type="negative", timeout=5)
|
||||
return False
|
||||
elif key == "disk_list" and any("filter" in k for k in self._elements.keys()):
|
||||
el.Notification("Can not add disk_list when a filter is utilized!", type="negative", timeout=5)
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
|
||||
@ -1,8 +1,7 @@
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from dataclasses import dataclass, field
|
||||
from nicegui import app, ui # type: ignore
|
||||
from typing import Optional
|
||||
from nicegui import ui
|
||||
from . import Tab
|
||||
import autopve.elements as el
|
||||
from autopve import elements as el
|
||||
from autopve import storage
|
||||
import logging
|
||||
|
||||
@ -10,12 +9,75 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class System(Tab):
|
||||
def __init__(self, answer=None) -> None:
|
||||
super().__init__(answer)
|
||||
def __init__(self, answer: str, type: Optional[str] = None, note: str = "") -> None:
|
||||
self.note: str = note
|
||||
self.select: Optional[ui.select] = None
|
||||
self.last_update_timestamp: float = 0
|
||||
super().__init__(answer, type=type)
|
||||
|
||||
def _build(self):
|
||||
def set_match(match: str):
|
||||
storage.answer(self.answer)["match"] = match
|
||||
self.restriction_picker()
|
||||
|
||||
answer = storage.answer(self.answer)
|
||||
el.FInput("Match String", value=answer["match"] if "match" in answer else "", on_change=lambda e: set_match(e.value))
|
||||
def restriction_picker(self):
|
||||
def restriction_controls():
|
||||
with ui.column() as col:
|
||||
col.tailwind.width("[560px]").align_items("center")
|
||||
with ui.card() as card:
|
||||
card.tailwind.width("full")
|
||||
self.select = ui.select(self._share.unique_system_information, new_value_mode="add", with_input=True)
|
||||
self.select.tailwind.width("full")
|
||||
card.on("mousemove", handler=self.update, throttle=3)
|
||||
with ui.row() as row:
|
||||
row.tailwind.width("full").align_items("center").justify_content("between")
|
||||
restriction = el.FInput(read_only=True)
|
||||
restriction.tailwind.width("[420px]")
|
||||
restriction.bind_value_from(self.select)
|
||||
ui.button(icon="add", on_click=lambda restriction=restriction: add_restriction(restriction.value))
|
||||
ui.label(self.note).tailwind.align_self("center")
|
||||
ui.separator()
|
||||
self.scroll = ui.scroll_area()
|
||||
self.scroll.tailwind.width("full").height("[480px]")
|
||||
restrictions = []
|
||||
if self.type in storage.answer(self.answer):
|
||||
restrictions = storage.answer(self.answer)[self.type]
|
||||
for restriction in restrictions:
|
||||
add_restriction(restriction)
|
||||
|
||||
def add_restriction(restriction: str):
|
||||
if restriction is not None and restriction.strip() != "" and restriction not in self._elements.keys():
|
||||
with self.scroll:
|
||||
with ui.row() as row:
|
||||
row.tailwind.width("full").align_items("center").justify_content("between")
|
||||
self._elements[restriction] = {
|
||||
"control": el.FInput(value=restriction, read_only=True),
|
||||
"row": row,
|
||||
}
|
||||
self._elements[restriction]["control"].tailwind.width("[420px]")
|
||||
ui.button(icon="remove", on_click=lambda _, r=restriction: remove_restriction(r))
|
||||
if self.type not in storage.answer(self.answer):
|
||||
storage.answer(self.answer)[self.type] = []
|
||||
if restriction not in storage.answer(self.answer)[self.type]:
|
||||
storage.answer(self.answer)[self.type].append(restriction)
|
||||
|
||||
def remove_restriction(restriction):
|
||||
self.scroll.remove(self._elements[restriction]["row"])
|
||||
del self._elements[restriction]
|
||||
if restriction in storage.answer(self.answer)[self.type]:
|
||||
storage.answer(self.answer)[self.type].remove(restriction)
|
||||
|
||||
restriction_controls()
|
||||
|
||||
def update(self):
|
||||
if self.select is not None and self._share.last_timestamp > self.last_update_timestamp:
|
||||
self.last_update_timestamp = self._share.last_timestamp
|
||||
self.select.update()
|
||||
|
||||
|
||||
class MustContain(System):
|
||||
def __init__(self, answer: str) -> None:
|
||||
super().__init__(answer, type="must_contain", note="The system information must contain at least one of these strings.")
|
||||
|
||||
|
||||
class MustNotContain(System):
|
||||
def __init__(self, answer: str) -> None:
|
||||
super().__init__(answer, type="must_not_contain", note="The system information must not contain any of these strings.")
|
||||
|
||||
95
main.py
95
main.py
@ -2,8 +2,12 @@ import mylogging
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from typing import Any, Dict
|
||||
import json
|
||||
import tomlkit
|
||||
import os
|
||||
|
||||
os.environ.setdefault("NICEGUI_STORAGE_PATH", "data")
|
||||
if not os.path.exists("data"):
|
||||
logger.warning("Could not find 'data' directory, verify bind mounts.")
|
||||
if os.path.exists(".nicegui"):
|
||||
@ -14,12 +18,14 @@ if not os.path.exists("data"):
|
||||
os.makedirs("data")
|
||||
else:
|
||||
logger.warning("Found 'data' directory.")
|
||||
os.environ.setdefault("NICEGUI_STORAGE_PATH", "data")
|
||||
from fastapi import Request
|
||||
from fastapi.responses import PlainTextResponse
|
||||
from nicegui import app, Client, ui # type: ignore
|
||||
|
||||
|
||||
if __name__ in {"__main__", "__mp_main__"}:
|
||||
from nicegui import app, ui # type: ignore
|
||||
|
||||
@ui.page("/", response_timeout=30)
|
||||
# async def page(client: Client) -> None:
|
||||
def page() -> None:
|
||||
ui.card.default_style("max-width: none")
|
||||
ui.card.default_props("flat bordered")
|
||||
ui.input.default_props("outlined dense hide-bottom-space")
|
||||
@ -29,9 +35,86 @@ if __name__ in {"__main__", "__mp_main__"}:
|
||||
ui.stepper.default_props("flat")
|
||||
ui.stepper.default_classes("full-size-stepper")
|
||||
|
||||
from autopve import page
|
||||
import autopve.elements as el
|
||||
from autopve.drawer import Drawer
|
||||
from autopve.content import Content
|
||||
|
||||
app.add_static_files("/static", "static")
|
||||
el.load_element_css()
|
||||
ui.colors(
|
||||
primary=el.orange,
|
||||
secondary=el.orange,
|
||||
accent=el.orange,
|
||||
dark=el.dark,
|
||||
positive="#21BA45",
|
||||
negative="#C10015",
|
||||
info="#5C8984",
|
||||
warning="#F2C037",
|
||||
)
|
||||
column = ui.column()
|
||||
content = Content()
|
||||
drawer = Drawer(column, content.answer_selected, content.hide)
|
||||
drawer.build()
|
||||
|
||||
|
||||
@app.post("/answer")
|
||||
async def post_answer(request: Request) -> PlainTextResponse:
|
||||
import autopve.elements as el
|
||||
from autopve import storage
|
||||
from autopve.tabs import history
|
||||
|
||||
def response(answer: str, system_info: Dict[str, Any], data: Dict[str, Any]):
|
||||
toml = tomlkit.dumps(data)
|
||||
toml_fixed = ""
|
||||
for line in toml.splitlines():
|
||||
if len(line) > 0 and line[0] == '"':
|
||||
line = line.replace('"', "", 2)
|
||||
toml_fixed = toml_fixed + line + "\n"
|
||||
r = history.Request(answer=answer, response=toml_fixed, system_info=system_info)
|
||||
history.History.add_history(r)
|
||||
for client in Client.instances.values():
|
||||
if not client.has_socket_connection:
|
||||
continue
|
||||
with client:
|
||||
el.Notification(f"New answer request from {r.name} served by {r.answer}!", type="positive", timeout=15)
|
||||
return PlainTextResponse(toml_fixed)
|
||||
|
||||
system_info = await request.json()
|
||||
system_info_raw = json.dumps(system_info)
|
||||
default_data = storage.answer("Default", copy=True)
|
||||
answers = list(storage.answers.keys())
|
||||
if "Default" in answers:
|
||||
answers.remove("Default")
|
||||
for answer in answers:
|
||||
answer_data = storage.answer(answer, copy=True)
|
||||
match = False
|
||||
if "must_contain" in answer_data:
|
||||
for entry in answer_data["must_contain"]:
|
||||
if len(entry) > 0 and entry in system_info_raw:
|
||||
match = True
|
||||
if "must_not_contain" in answer_data:
|
||||
for entry in answer_data["must_not_contain"]:
|
||||
if len(entry) > 0 and entry in system_info_raw:
|
||||
match = False
|
||||
if match is True:
|
||||
if "global" in default_data and "global" in answer_data:
|
||||
default_data["global"].update(answer_data["global"])
|
||||
if "network" in default_data and "network" in answer_data:
|
||||
default_data["network"].update(answer_data["network"])
|
||||
if "disk-setup" in default_data and "disk-setup" in answer_data:
|
||||
if any("filter" in k for k in answer_data["disk-setup"]) and "disk_list" in default_data["disk-setup"]:
|
||||
del default_data["disk-setup"]["disk_list"]
|
||||
if "disk_list" in answer_data["disk-setup"]:
|
||||
for key in list(default_data["disk-setup"].keys()):
|
||||
if "filter" in key:
|
||||
del default_data["disk-setup"][key]
|
||||
default_data["disk-setup"].update(answer_data["disk-setup"])
|
||||
return response(answer, system_info, default_data)
|
||||
return response("Default", system_info, default_data)
|
||||
|
||||
|
||||
if __name__ in {"__main__", "__mp_main__"}:
|
||||
from autopve import logo
|
||||
|
||||
app.on_startup(lambda: print(f"Starting autopve, bound to the following addresses {', '.join(app.urls)}.", flush=True))
|
||||
page.build()
|
||||
ui.run(title="autopve", favicon=logo.logo, dark=True, reload=False, show=False, show_welcome_message=False)
|
||||
|
||||
3
pyproject.toml
Normal file
3
pyproject.toml
Normal file
@ -0,0 +1,3 @@
|
||||
[tool.pytest.ini_options]
|
||||
addopts = "--driver Chrome"
|
||||
asyncio_mode = "auto"
|
||||
55
requirements-test.txt
Normal file
55
requirements-test.txt
Normal file
@ -0,0 +1,55 @@
|
||||
aiofiles==23.2.1
|
||||
aiohttp==3.9.5
|
||||
aiosignal==1.3.1
|
||||
annotated-types==0.6.0
|
||||
anyio==4.3.0
|
||||
attrs==23.2.0
|
||||
bidict==0.23.1
|
||||
certifi==2024.2.2
|
||||
charset-normalizer==3.3.2
|
||||
click==8.1.7
|
||||
docutils==0.19
|
||||
fastapi==0.109.2
|
||||
frozenlist==1.4.1
|
||||
h11==0.14.0
|
||||
httpcore==1.0.5
|
||||
httptools==0.6.1
|
||||
httpx==0.27.0
|
||||
idna==3.7
|
||||
ifaddr==0.2.0
|
||||
itsdangerous==2.2.0
|
||||
Jinja2==3.1.3
|
||||
markdown2==2.4.13
|
||||
MarkupSafe==2.1.5
|
||||
multidict==6.0.5
|
||||
nicegui==1.4.23
|
||||
orjson==3.10.3
|
||||
pscript==0.7.7
|
||||
pydantic==2.7.1
|
||||
pydantic_core==2.18.2
|
||||
Pygments==2.17.2
|
||||
python-dotenv==1.0.1
|
||||
python-engineio==4.9.0
|
||||
python-multipart==0.0.9
|
||||
python-socketio==5.11.2
|
||||
PyYAML==6.0.1
|
||||
requests==2.31.0
|
||||
simple-websocket==1.0.0
|
||||
sniffio==1.3.1
|
||||
starlette==0.36.3
|
||||
tomlkit==0.12.4
|
||||
typing_extensions==4.11.0
|
||||
urllib3==2.2.1
|
||||
uvicorn==0.29.0
|
||||
uvloop==0.19.0
|
||||
vbuild==0.8.2
|
||||
watchfiles==0.21.0
|
||||
websockets==12.0
|
||||
wsproto==1.2.0
|
||||
yarl==1.9.4
|
||||
pytest
|
||||
pytest-selenium
|
||||
pytest-asyncio
|
||||
selenium
|
||||
icecream
|
||||
beautifulsoup4
|
||||
@ -1,54 +1,46 @@
|
||||
aiofiles==23.2.1
|
||||
aiohttp==3.9.3
|
||||
aiohttp==3.9.5
|
||||
aiosignal==1.3.1
|
||||
annotated-types==0.6.0
|
||||
anyio==4.2.0
|
||||
anyio==4.3.0
|
||||
attrs==23.2.0
|
||||
bidict==0.22.1
|
||||
black==24.2.0
|
||||
bidict==0.23.1
|
||||
certifi==2024.2.2
|
||||
charset-normalizer==3.3.2
|
||||
click==8.1.7
|
||||
docutils==0.19
|
||||
fastapi==0.109.2
|
||||
frozenlist==1.4.1
|
||||
h11==0.14.0
|
||||
httpcore==1.0.2
|
||||
httpcore==1.0.5
|
||||
httptools==0.6.1
|
||||
httpx==0.26.0
|
||||
idna==3.6
|
||||
httpx==0.27.0
|
||||
idna==3.7
|
||||
ifaddr==0.2.0
|
||||
itsdangerous==2.1.2
|
||||
itsdangerous==2.2.0
|
||||
Jinja2==3.1.3
|
||||
markdown2==2.4.12
|
||||
markdown2==2.4.13
|
||||
MarkupSafe==2.1.5
|
||||
multidict==6.0.5
|
||||
mypy-extensions==1.0.0
|
||||
nicegui==1.4.15
|
||||
nicegui-highcharts==1.0.1
|
||||
numpy==1.26.4
|
||||
orjson==3.9.13
|
||||
packaging==23.2
|
||||
pathspec==0.12.1
|
||||
platformdirs==4.2.0
|
||||
nicegui==1.4.23
|
||||
orjson==3.10.3
|
||||
pscript==0.7.7
|
||||
pydantic==2.6.1
|
||||
pydantic_core==2.16.2
|
||||
pydantic==2.7.1
|
||||
pydantic_core==2.18.2
|
||||
Pygments==2.17.2
|
||||
python-dotenv==1.0.1
|
||||
python-engineio==4.9.0
|
||||
python-multipart==0.0.9
|
||||
python-socketio==5.11.1
|
||||
python-socketio==5.11.2
|
||||
PyYAML==6.0.1
|
||||
requests==2.31.0
|
||||
scipy==1.12.0
|
||||
simple-websocket==1.0.0
|
||||
sniffio==1.3.0
|
||||
sniffio==1.3.1
|
||||
starlette==0.36.3
|
||||
tomlkit==0.12.4
|
||||
types-requests==2.31.0.20240125
|
||||
typing_extensions==4.9.0
|
||||
urllib3==2.2.0
|
||||
uvicorn==0.27.1
|
||||
typing_extensions==4.11.0
|
||||
urllib3==2.2.1
|
||||
uvicorn==0.29.0
|
||||
uvloop==0.19.0
|
||||
vbuild==0.8.2
|
||||
watchfiles==0.21.0
|
||||
|
||||
52
tests/conftest.py
Normal file
52
tests/conftest.py
Normal file
@ -0,0 +1,52 @@
|
||||
# from nicegui.testing.conftest import *
|
||||
from collections.abc import Generator
|
||||
import os
|
||||
import pytest
|
||||
from nicegui.testing import Screen
|
||||
from nicegui.testing.conftest import (
|
||||
capabilities, # noqa: F401
|
||||
driver, # noqa: F401
|
||||
remove_all_screenshots, # noqa: F401
|
||||
reset_globals, # noqa: F401
|
||||
DOWNLOAD_DIR,
|
||||
)
|
||||
from selenium import webdriver
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def chrome_options(chrome_options: webdriver.ChromeOptions) -> webdriver.ChromeOptions:
|
||||
"""Configure the Chrome driver options."""
|
||||
chrome_options.add_argument("disable-dev-shm-using")
|
||||
chrome_options.add_argument("no-sandbox")
|
||||
chrome_options.add_argument("headless")
|
||||
# check if we are running on GitHub Actions
|
||||
if "GITHUB_ACTIONS" in os.environ:
|
||||
chrome_options.add_argument("disable-gpu")
|
||||
else:
|
||||
chrome_options.add_argument("--use-gl=angle")
|
||||
chrome_options.add_argument("window-size=1920x1080")
|
||||
chrome_options.add_experimental_option(
|
||||
"prefs",
|
||||
{
|
||||
"download.default_directory": str(DOWNLOAD_DIR),
|
||||
"download.prompt_for_download": False, # To auto download the file
|
||||
"download.directory_upgrade": True,
|
||||
},
|
||||
)
|
||||
if "CHROME_BINARY_LOCATION" in os.environ:
|
||||
chrome_options.binary_location = os.environ["CHROME_BINARY_LOCATION"]
|
||||
return chrome_options
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def screen(
|
||||
driver: webdriver.Chrome, # noqa: F811
|
||||
request: pytest.FixtureRequest,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> Generator[Screen, None, None]:
|
||||
"""Create a new Screen instance."""
|
||||
screen_ = Screen(driver, caplog)
|
||||
yield screen_
|
||||
if screen_.is_open:
|
||||
screen_.shot(request.node.name)
|
||||
screen_.stop_server()
|
||||
Reference in New Issue
Block a user