0x0/mod.py

315 lines
12 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
from itertools import zip_longest
from sys import stdout
import time
from textual.app import App, ComposeResult
2024-03-30 17:30:33 +01:00
from textual.widgets import DataTable, Header, Footer, RichLog, Static, Input
from textual.containers import Horizontal, Vertical
from textual.screen import Screen
from textual import log
from rich.text import Text
from jinja2.filters import do_filesizeformat
import ipaddress
from fhost import db, File, AddrFilter, su, app as fhost_app
2024-09-27 17:39:18 +02:00
from modui import FileTable, mime, MpvWidget, Notification
fhost_app.app_context().push()
2024-09-27 17:39:18 +02:00
class NullptrMod(Screen):
BINDINGS = [
("q", "quit_app", "Quit"),
2024-03-30 17:30:33 +01:00
("f1", "filter(1, 'Name')", "Lookup name"),
("f2", "filter(2, 'IP address')", "Filter IP"),
("f3", "filter(3, 'MIME Type')", "Filter MIME"),
("f4", "filter(4, 'Extension')", "Filter Ext."),
("f5", "refresh", "Refresh"),
("f6", "filter_clear", "Clear filter"),
2024-03-30 17:30:33 +01:00
("f7", "filter(5, 'User agent')", "Filter UA"),
("r", "remove_file(False)", "Remove file"),
("ctrl+r", "remove_file(True)", "Ban file"),
("p", "ban_ip(False)", "Ban IP"),
("ctrl+p", "ban_ip(True)", "Nuke IP"),
]
async def action_quit_app(self):
self.mpvw.shutdown()
await self.app.action_quit()
def action_refresh(self):
ftable = self.query_one("#ftable")
ftable.watch_query(None, None)
def action_filter_clear(self):
2024-03-30 17:30:33 +01:00
self.finput.display = False
ftable = self.query_one("#ftable")
ftable.focus()
ftable.query = ftable.base_query
def action_filter(self, fcol: int, label: str):
2024-03-30 17:30:33 +01:00
self.finput.placeholder = label
self.finput.display = True
self.finput.focus()
self.filter_col = fcol
self._refresh_layout()
if self.current_file:
match fcol:
2024-03-30 17:30:33 +01:00
case 1: self.finput.value = ""
case 2: self.finput.value = self.current_file.addr.compressed
2024-03-30 17:30:33 +01:00
case 3: self.finput.value = self.current_file.mime
case 4: self.finput.value = self.current_file.ext
case 5: self.finput.value = self.current_file.ua or ""
def on_input_submitted(self, message: Input.Submitted) -> None:
2024-03-30 17:30:33 +01:00
self.finput.display = False
ftable = self.query_one("#ftable")
ftable.focus()
2024-09-27 17:39:18 +02:00
q = ftable.base_query
if len(message.value):
match self.filter_col:
case 1:
2024-09-27 17:39:18 +02:00
try:
q = q.filter(File.id == su.debase(message.value))
except ValueError:
return
case 2:
try:
addr = ipaddress.ip_address(message.value)
if type(addr) is ipaddress.IPv6Address:
addr = addr.ipv4_mapped or addr
2024-09-27 17:39:18 +02:00
q = q.filter(File.addr == addr)
except ValueError:
return
case 3: q = q.filter(File.mime.like(message.value))
case 4: q = q.filter(File.ext.like(message.value))
case 5: q = q.filter(File.ua.like(message.value))
ftable.query = q
def action_remove_file(self, permanent: bool) -> None:
if self.current_file:
self.current_file.delete(permanent)
db.session.commit()
2024-09-27 17:39:18 +02:00
self.mount(Notification(f"{'Banned' if permanent else 'Removed'}"
f"file {self.current_file.getname()}"))
self.action_refresh()
def action_ban_ip(self, nuke: bool) -> None:
if self.current_file:
2024-09-27 17:39:18 +02:00
addr = self.current_file.addr
if AddrFilter.query.filter(AddrFilter.addr == addr).scalar():
txt = f"{addr.compressed} is already banned"
else:
2024-09-27 17:39:18 +02:00
db.session.add(AddrFilter(addr))
db.session.commit()
2024-09-27 17:39:18 +02:00
txt = f"Banned {addr.compressed}"
if nuke:
tsize = 0
trm = 0
2024-09-27 17:39:18 +02:00
for f in File.query.filter(File.addr == addr):
if f.getpath().is_file():
tsize += f.size or f.getpath().stat().st_size
trm += 1
f.delete(True)
db.session.commit()
2024-09-27 17:39:18 +02:00
txt += f", removed {trm} {'files' if trm != 1 else 'file'} " \
f"totaling {do_filesizeformat(tsize, True)}"
self.mount(Notification(txt))
self._refresh_layout()
ftable = self.query_one("#ftable")
ftable.watch_query(None, None)
def on_update(self) -> None:
stdout.write("\033[?25l")
stdout.flush()
def compose(self) -> ComposeResult:
yield Header()
yield Horizontal(
2024-03-30 17:30:33 +01:00
FileTable(id="ftable", zebra_stripes=True, cursor_type="row"),
Vertical(
2024-03-30 17:30:33 +01:00
DataTable(id="finfo", show_header=False, cursor_type="none"),
MpvWidget(id="mpv"),
2024-03-30 17:30:33 +01:00
RichLog(id="ftextlog", auto_scroll=False),
2024-09-27 17:39:18 +02:00
id="infopane"))
2024-03-30 17:30:33 +01:00
yield Input(id="filter_input")
yield Footer()
def on_mount(self) -> None:
self.current_file = None
self.ftable = self.query_one("#ftable")
self.ftable.focus()
self.finfo = self.query_one("#finfo")
self.finfo.add_columns("key", "value")
self.mpvw = self.query_one("#mpv")
self.ftlog = self.query_one("#ftextlog")
2024-03-30 17:30:33 +01:00
self.finput = self.query_one("#filter_input")
self.mimehandler = mime.MIMEHandler()
2024-09-27 17:39:18 +02:00
self.mimehandler.register(mime.MIMECategory.Archive,
self.handle_libarchive)
self.mimehandler.register(mime.MIMECategory.Text, self.handle_text)
self.mimehandler.register(mime.MIMECategory.AV, self.handle_mpv)
2024-09-27 17:39:18 +02:00
self.mimehandler.register(mime.MIMECategory.Document,
self.handle_mupdf)
self.mimehandler.register(mime.MIMECategory.Fallback,
self.handle_libarchive)
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_mpv)
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_raw)
def handle_libarchive(self, cat):
import libarchive
with libarchive.file_reader(str(self.current_file.getpath())) as a:
self.ftlog.write("\n".join(e.path for e in a))
return True
def handle_text(self, cat):
with open(self.current_file.getpath(), "r") as sf:
2024-09-27 17:39:18 +02:00
data = sf.read(1000000).replace("\033", "")
self.ftlog.write(data)
return True
def handle_mupdf(self, cat):
import fitz
with fitz.open(self.current_file.getpath(),
filetype=self.current_file.ext.lstrip(".")) as doc:
p = doc.load_page(0)
pix = p.get_pixmap(dpi=72)
imgdata = pix.tobytes("ppm").hex()
self.mpvw.styles.height = "40%"
self.mpvw.start_mpv("hex://" + imgdata, 0)
2024-09-27 17:39:18 +02:00
self.ftlog.write(
Text.from_markup(f"[bold]Pages:[/bold] {doc.page_count}"))
self.ftlog.write(Text.from_markup("[bold]Metadata:[/bold]"))
for k, v in doc.metadata.items():
self.ftlog.write(Text.from_markup(f" [bold]{k}:[/bold] {v}"))
toc = doc.get_toc()
if len(toc):
self.ftlog.write(Text.from_markup("[bold]TOC:[/bold]"))
for lvl, title, page in toc:
self.ftlog.write(f"{' ' * lvl} {page}: {title}")
return True
def handle_mpv(self, cat):
if cat == mime.MIMECategory.AV or self.current_file.nsfw_score >= 0:
self.mpvw.styles.height = "20%"
self.mpvw.start_mpv(str(self.current_file.getpath()), 0)
import av
with av.open(str(self.current_file.getpath())) as c:
self.ftlog.write(Text("Format:", style="bold"))
self.ftlog.write(f" {c.format.long_name}")
if len(c.metadata):
self.ftlog.write(Text("Metadata:", style="bold"))
for k, v in c.metadata.items():
self.ftlog.write(f" {k}: {v}")
for s in c.streams:
2024-09-27 17:39:18 +02:00
self.ftlog.write(
Text(f"Stream {s.index}:", style="bold"))
self.ftlog.write(f" Type: {s.type}")
if s.base_rate:
self.ftlog.write(f" Frame rate: {s.base_rate}")
if len(s.metadata):
self.ftlog.write(Text(" Metadata:", style="bold"))
for k, v in s.metadata.items():
self.ftlog.write(f" {k}: {v}")
return True
return False
def handle_raw(self, cat):
def hexdump(binf, length):
def fmt(s):
if isinstance(s, str):
c = chr(int(s, 16))
else:
c = chr(s)
s = c
2024-09-27 17:39:18 +02:00
if c.isalpha():
return f"\0[chartreuse1]{s}\0[/chartreuse1]"
if c.isdigit():
return f"\0[gold1]{s}\0[/gold1]"
if not c.isprintable():
g = "grey50" if c == "\0" else "cadet_blue"
return f"\0[{g}]{s if len(s) == 2 else '.'}\0[/{g}]"
return s
2024-09-27 17:39:18 +02:00
return Text.from_markup(
"\n".join(' '.join(
map(fmt, map(''.join, zip(*[iter(c.hex())] * 2)))) +
f"{' ' * (16 - len(c))} {''.join(map(fmt, c))}"
for c in
map(lambda x: bytes([n for n in x if n is not None]),
zip_longest(
*[iter(binf.read(min(length, 16 * 10)))] * 16))))
with open(self.current_file.getpath(), "rb") as binf:
self.ftlog.write(hexdump(binf, self.current_file.size))
if self.current_file.size > 16*10*2:
binf.seek(self.current_file.size-16*10)
self.ftlog.write(" [...] ".center(64, ''))
2024-09-27 17:39:18 +02:00
self.ftlog.write(hexdump(binf,
self.current_file.size - binf.tell()))
return True
def on_file_table_selected(self, message: FileTable.Selected) -> None:
f = message.file
self.current_file = f
self.finfo.clear()
self.finfo.add_rows([
("ID:", str(f.id)),
("File name:", f.getname()),
2024-09-27 17:39:18 +02:00
("URL:", f.geturl()
if fhost_app.config["SERVER_NAME"]
else "⚠ Set SERVER_NAME in config.py to display"),
("File size:", do_filesizeformat(f.size, True)),
("MIME type:", f.mime),
("SHA256 checksum:", f.sha256),
("Uploaded by:", Text(f.addr.compressed)),
("User agent:", Text(f.ua or "")),
("Management token:", f.mgmt_token),
("Secret:", f.secret),
2024-09-27 17:39:18 +02:00
("Is NSFW:", ("Yes" if f.is_nsfw else "No") +
(f" (Score: {f.nsfw_score:0.4f})"
if f.nsfw_score else " (Not scanned)")),
("Is banned:", "Yes" if f.removed else "No"),
2024-09-27 17:39:18 +02:00
("Expires:",
time.strftime("%Y-%m-%d %H:%M:%S",
time.gmtime(File.get_expiration(f.expiration,
f.size)/1000)))
])
self.mpvw.stop_mpv(True)
2024-03-30 17:30:33 +01:00
self.ftlog.clear()
if f.getpath().is_file():
self.mimehandler.handle(f.mime, f.ext)
2024-03-30 17:30:33 +01:00
self.ftlog.scroll_to(x=0, y=0, animate=False)
2024-09-27 17:39:18 +02:00
class NullptrModApp(App):
CSS_PATH = "mod.css"
def on_mount(self) -> None:
self.title = "0x0 File Moderation Interface"
self.main_screen = NullptrMod()
self.install_screen(self.main_screen, name="main")
self.push_screen("main")
2024-09-27 17:39:18 +02:00
if __name__ == "__main__":
app = NullptrModApp()
app.run()