mirror of
https://git.0x0.st/mia/0x0.git
synced 2024-11-23 17:17:12 +01:00
Compare commits
No commits in common. "a2b322f8687f72462fa9ac892ef8459fb374b074" and "6393538333338f695e30cce98a82836b04d2658a" have entirely different histories.
a2b322f868
...
6393538333
25 changed files with 426 additions and 700 deletions
|
@ -5,4 +5,4 @@ print("Instead, please run")
|
||||||
print("")
|
print("")
|
||||||
print(" $ FLASK_APP=fhost flask prune")
|
print(" $ FLASK_APP=fhost flask prune")
|
||||||
print("")
|
print("")
|
||||||
exit(1)
|
exit(1);
|
||||||
|
|
386
fhost.py
386
fhost.py
|
@ -1,7 +1,8 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Copyright © 2024 Mia Herkt
|
Copyright © 2020 Mia Herkt
|
||||||
Licensed under the EUPL, Version 1.2 or - as soon as approved
|
Licensed under the EUPL, Version 1.2 or - as soon as approved
|
||||||
by the European Commission - subsequent versions of the EUPL
|
by the European Commission - subsequent versions of the EUPL
|
||||||
(the "License");
|
(the "License");
|
||||||
|
@ -18,38 +19,30 @@
|
||||||
and limitations under the License.
|
and limitations under the License.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from flask import Flask, abort, make_response, redirect, render_template, \
|
from flask import Flask, abort, make_response, redirect, request, send_from_directory, url_for, Response, render_template
|
||||||
Request, request, Response, send_from_directory, url_for
|
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
from flask_sqlalchemy import SQLAlchemy
|
||||||
from flask_migrate import Migrate
|
from flask_migrate import Migrate
|
||||||
from sqlalchemy import and_, or_
|
from sqlalchemy import and_, or_
|
||||||
from sqlalchemy.orm import declared_attr
|
|
||||||
import sqlalchemy.types as types
|
|
||||||
from jinja2.exceptions import *
|
from jinja2.exceptions import *
|
||||||
from jinja2 import ChoiceLoader, FileSystemLoader
|
from jinja2 import ChoiceLoader, FileSystemLoader
|
||||||
from hashlib import file_digest
|
from hashlib import sha256
|
||||||
from magic import Magic
|
from magic import Magic
|
||||||
from mimetypes import guess_extension
|
from mimetypes import guess_extension
|
||||||
import click
|
import click
|
||||||
import enum
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import datetime
|
import datetime
|
||||||
import ipaddress
|
|
||||||
import typing
|
import typing
|
||||||
import requests
|
import requests
|
||||||
import secrets
|
import secrets
|
||||||
import re
|
|
||||||
from validators import url as url_valid
|
from validators import url as url_valid
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
app = Flask(__name__, instance_relative_config=True)
|
app = Flask(__name__, instance_relative_config=True)
|
||||||
app.config.update(
|
app.config.update(
|
||||||
SQLALCHEMY_TRACK_MODIFICATIONS = False,
|
SQLALCHEMY_TRACK_MODIFICATIONS = False,
|
||||||
PREFERRED_URL_SCHEME="https", # nginx users: make sure to have
|
PREFERRED_URL_SCHEME = "https", # nginx users: make sure to have 'uwsgi_param UWSGI_SCHEME $scheme;' in your config
|
||||||
# 'uwsgi_param UWSGI_SCHEME $scheme;' in
|
|
||||||
# your config
|
|
||||||
MAX_CONTENT_LENGTH = 256 * 1024 * 1024,
|
MAX_CONTENT_LENGTH = 256 * 1024 * 1024,
|
||||||
MAX_URL_LENGTH = 4096,
|
MAX_URL_LENGTH = 4096,
|
||||||
USE_X_SENDFILE = False,
|
USE_X_SENDFILE = False,
|
||||||
|
@ -70,6 +63,12 @@ app.config.update(
|
||||||
"text/plain" : ".txt",
|
"text/plain" : ".txt",
|
||||||
"text/x-diff" : ".diff",
|
"text/x-diff" : ".diff",
|
||||||
},
|
},
|
||||||
|
FHOST_MIME_BLACKLIST = [
|
||||||
|
"application/x-dosexec",
|
||||||
|
"application/java-archive",
|
||||||
|
"application/java-vm"
|
||||||
|
],
|
||||||
|
FHOST_UPLOAD_BLACKLIST = None,
|
||||||
NSFW_DETECT = False,
|
NSFW_DETECT = False,
|
||||||
NSFW_THRESHOLD = 0.92,
|
NSFW_THRESHOLD = 0.92,
|
||||||
VSCAN_SOCKET = None,
|
VSCAN_SOCKET = None,
|
||||||
|
@ -79,10 +78,10 @@ app.config.update(
|
||||||
"PUA.Win.Packer.XmMusicFile",
|
"PUA.Win.Packer.XmMusicFile",
|
||||||
],
|
],
|
||||||
VSCAN_INTERVAL = datetime.timedelta(days=7),
|
VSCAN_INTERVAL = datetime.timedelta(days=7),
|
||||||
URL_ALPHABET="DEQhd2uFteibPwq0SWBInTpA_jcZL5GKz3YCR14Ulk87Jors9vNHgfaOmMX"
|
URL_ALPHABET = "DEQhd2uFteibPwq0SWBInTpA_jcZL5GKz3YCR14Ulk87Jors9vNHgfaOmMXy6Vx-",
|
||||||
"y6Vx-",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not app.config["TESTING"]:
|
||||||
app.config.from_pyfile("config.py")
|
app.config.from_pyfile("config.py")
|
||||||
app.jinja_loader = ChoiceLoader([
|
app.jinja_loader = ChoiceLoader([
|
||||||
FileSystemLoader(str(Path(app.instance_path) / "templates")),
|
FileSystemLoader(str(Path(app.instance_path) / "templates")),
|
||||||
|
@ -98,7 +97,7 @@ if app.config["NSFW_DETECT"]:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
mimedetect = Magic(mime=True, mime_encoding=False)
|
mimedetect = Magic(mime=True, mime_encoding=False)
|
||||||
except TypeError:
|
except:
|
||||||
print("""Error: You have installed the wrong version of the 'magic' module.
|
print("""Error: You have installed the wrong version of the 'magic' module.
|
||||||
Please install python-magic.""")
|
Please install python-magic.""")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
@ -106,7 +105,6 @@ Please install python-magic.""")
|
||||||
db = SQLAlchemy(app)
|
db = SQLAlchemy(app)
|
||||||
migrate = Migrate(app, db)
|
migrate = Migrate(app, db)
|
||||||
|
|
||||||
|
|
||||||
class URL(db.Model):
|
class URL(db.Model):
|
||||||
__tablename__ = "URL"
|
__tablename__ = "URL"
|
||||||
id = db.Column(db.Integer, primary_key = True)
|
id = db.Column(db.Integer, primary_key = True)
|
||||||
|
@ -121,7 +119,6 @@ class URL(db.Model):
|
||||||
def geturl(self):
|
def geturl(self):
|
||||||
return url_for("get", path=self.getname(), _external=True) + "\n"
|
return url_for("get", path=self.getname(), _external=True) + "\n"
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def get(url):
|
def get(url):
|
||||||
u = URL.query.filter_by(url=url).first()
|
u = URL.query.filter_by(url=url).first()
|
||||||
|
|
||||||
|
@ -132,35 +129,12 @@ class URL(db.Model):
|
||||||
|
|
||||||
return u
|
return u
|
||||||
|
|
||||||
|
|
||||||
class IPAddress(types.TypeDecorator):
|
|
||||||
impl = types.LargeBinary
|
|
||||||
cache_ok = True
|
|
||||||
|
|
||||||
def process_bind_param(self, value, dialect):
|
|
||||||
match value:
|
|
||||||
case ipaddress.IPv6Address():
|
|
||||||
value = (value.ipv4_mapped or value).packed
|
|
||||||
case ipaddress.IPv4Address():
|
|
||||||
value = value.packed
|
|
||||||
|
|
||||||
return value
|
|
||||||
|
|
||||||
def process_result_value(self, value, dialect):
|
|
||||||
if value is not None:
|
|
||||||
value = ipaddress.ip_address(value)
|
|
||||||
if type(value) is ipaddress.IPv6Address:
|
|
||||||
value = value.ipv4_mapped or value
|
|
||||||
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
class File(db.Model):
|
class File(db.Model):
|
||||||
id = db.Column(db.Integer, primary_key = True)
|
id = db.Column(db.Integer, primary_key = True)
|
||||||
sha256 = db.Column(db.String, unique = True)
|
sha256 = db.Column(db.String, unique = True)
|
||||||
ext = db.Column(db.UnicodeText)
|
ext = db.Column(db.UnicodeText)
|
||||||
mime = db.Column(db.UnicodeText)
|
mime = db.Column(db.UnicodeText)
|
||||||
addr = db.Column(IPAddress(16))
|
addr = db.Column(db.UnicodeText)
|
||||||
ua = db.Column(db.UnicodeText)
|
ua = db.Column(db.UnicodeText)
|
||||||
removed = db.Column(db.Boolean, default=False)
|
removed = db.Column(db.Boolean, default=False)
|
||||||
nsfw_score = db.Column(db.Float)
|
nsfw_score = db.Column(db.Float)
|
||||||
|
@ -181,19 +155,18 @@ class File(db.Model):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_nsfw(self) -> bool:
|
def is_nsfw(self) -> bool:
|
||||||
if self.nsfw_score:
|
return self.nsfw_score and self.nsfw_score > app.config["NSFW_THRESHOLD"]
|
||||||
return self.nsfw_score > app.config["NSFW_THRESHOLD"]
|
|
||||||
return False
|
|
||||||
|
|
||||||
def getname(self):
|
def getname(self):
|
||||||
return u"{0}{1}".format(su.enbase(self.id), self.ext)
|
return u"{0}{1}".format(su.enbase(self.id), self.ext)
|
||||||
|
|
||||||
def geturl(self):
|
def geturl(self):
|
||||||
n = self.getname()
|
n = self.getname()
|
||||||
a = "nsfw" if self.is_nsfw else None
|
|
||||||
|
|
||||||
return url_for("get", path=n, secret=self.secret,
|
if self.is_nsfw:
|
||||||
_external=True, _anchor=a) + "\n"
|
return url_for("get", path=n, secret=self.secret, _external=True, _anchor="nsfw") + "\n"
|
||||||
|
else:
|
||||||
|
return url_for("get", path=n, secret=self.secret, _external=True) + "\n"
|
||||||
|
|
||||||
def getpath(self) -> Path:
|
def getpath(self) -> Path:
|
||||||
return Path(app.config["FHOST_STORAGE_PATH"]) / self.sha256
|
return Path(app.config["FHOST_STORAGE_PATH"]) / self.sha256
|
||||||
|
@ -204,37 +177,33 @@ class File(db.Model):
|
||||||
self.removed = permanent
|
self.removed = permanent
|
||||||
self.getpath().unlink(missing_ok=True)
|
self.getpath().unlink(missing_ok=True)
|
||||||
|
|
||||||
"""
|
# Returns the epoch millisecond that a file should expire
|
||||||
Returns the epoch millisecond that a file should expire
|
#
|
||||||
|
# Uses the expiration time provided by the user (requested_expiration)
|
||||||
Uses the expiration time provided by the user (requested_expiration)
|
# upper-bounded by an algorithm that computes the size based on the size of the
|
||||||
upper-bounded by an algorithm that computes the size based on the size of
|
# file.
|
||||||
the file.
|
#
|
||||||
|
# That is, all files are assigned a computed expiration, which can voluntarily
|
||||||
That is, all files are assigned a computed expiration, which can be
|
# shortened by the user either by providing a timestamp in epoch millis or a
|
||||||
voluntarily shortened by the user either by providing a timestamp in
|
# duration in hours.
|
||||||
milliseconds since epoch or a duration in hours.
|
|
||||||
"""
|
|
||||||
@staticmethod
|
|
||||||
def get_expiration(requested_expiration, size) -> int:
|
def get_expiration(requested_expiration, size) -> int:
|
||||||
current_epoch_millis = time.time() * 1000
|
current_epoch_millis = time.time() * 1000;
|
||||||
|
|
||||||
# Maximum lifetime of the file in milliseconds
|
# Maximum lifetime of the file in milliseconds
|
||||||
max_lifespan = get_max_lifespan(size)
|
this_files_max_lifespan = get_max_lifespan(size);
|
||||||
|
|
||||||
# The latest allowed expiration date for this file, in epoch millis
|
# The latest allowed expiration date for this file, in epoch millis
|
||||||
max_expiration = max_lifespan + 1000 * time.time()
|
this_files_max_expiration = this_files_max_lifespan + 1000 * time.time();
|
||||||
|
|
||||||
if requested_expiration is None:
|
if requested_expiration is None:
|
||||||
return max_expiration
|
return this_files_max_expiration
|
||||||
elif requested_expiration < 1650460320000:
|
elif requested_expiration < 1650460320000:
|
||||||
# Treat the requested expiration time as a duration in hours
|
# Treat the requested expiration time as a duration in hours
|
||||||
requested_expiration_ms = requested_expiration * 60 * 60 * 1000
|
requested_expiration_ms = requested_expiration * 60 * 60 * 1000
|
||||||
return min(max_expiration,
|
return min(this_files_max_expiration, current_epoch_millis + requested_expiration_ms)
|
||||||
current_epoch_millis + requested_expiration_ms)
|
|
||||||
else:
|
else:
|
||||||
# Treat expiration time as a timestamp in epoch millis
|
# Treat the requested expiration time as a timestamp in epoch millis
|
||||||
return min(max_expiration, requested_expiration)
|
return min(this_files_max_expiration, requested_expiration)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
requested_expiration can be:
|
requested_expiration can be:
|
||||||
|
@ -242,38 +211,29 @@ class File(db.Model):
|
||||||
- a duration (in hours) that the file should live for
|
- a duration (in hours) that the file should live for
|
||||||
- a timestamp in epoch millis that the file should expire at
|
- a timestamp in epoch millis that the file should expire at
|
||||||
|
|
||||||
Any value greater that the longest allowed file lifespan will be rounded
|
Any value greater that the longest allowed file lifespan will be rounded down to that
|
||||||
down to that value.
|
value.
|
||||||
"""
|
"""
|
||||||
@staticmethod
|
def store(file_, requested_expiration: typing.Optional[int], addr, ua, secret: bool):
|
||||||
def store(file_, requested_expiration: typing.Optional[int], addr, ua,
|
data = file_.read()
|
||||||
secret: bool):
|
digest = sha256(data).hexdigest()
|
||||||
fstream = file_.stream
|
|
||||||
digest = file_digest(fstream, "sha256").hexdigest()
|
|
||||||
fstream.seek(0, os.SEEK_END)
|
|
||||||
flen = fstream.tell()
|
|
||||||
fstream.seek(0)
|
|
||||||
|
|
||||||
def get_mime():
|
def get_mime():
|
||||||
guess = mimedetect.from_descriptor(fstream.fileno())
|
guess = mimedetect.from_buffer(data)
|
||||||
app.logger.debug(f"MIME - specified: '{file_.content_type}' - "
|
app.logger.debug(f"MIME - specified: '{file_.content_type}' - detected: '{guess}'")
|
||||||
f"detected: '{guess}'")
|
|
||||||
|
|
||||||
if (not file_.content_type
|
if not file_.content_type or not "/" in file_.content_type or file_.content_type == "application/octet-stream":
|
||||||
or "/" not in file_.content_type
|
|
||||||
or file_.content_type == "application/octet-stream"):
|
|
||||||
mime = guess
|
mime = guess
|
||||||
else:
|
else:
|
||||||
mime = file_.content_type
|
mime = file_.content_type
|
||||||
|
|
||||||
|
if mime in app.config["FHOST_MIME_BLACKLIST"] or guess in app.config["FHOST_MIME_BLACKLIST"]:
|
||||||
|
abort(415)
|
||||||
|
|
||||||
if len(mime) > 128:
|
if len(mime) > 128:
|
||||||
abort(400)
|
abort(400)
|
||||||
|
|
||||||
for flt in MIMEFilter.query.all():
|
if mime.startswith("text/") and not "charset" in mime:
|
||||||
if flt.check(guess):
|
|
||||||
abort(403, flt.reason)
|
|
||||||
|
|
||||||
if mime.startswith("text/") and "charset" not in mime:
|
|
||||||
mime += "; charset=utf-8"
|
mime += "; charset=utf-8"
|
||||||
|
|
||||||
return mime
|
return mime
|
||||||
|
@ -285,8 +245,7 @@ class File(db.Model):
|
||||||
gmime = mime.split(";")[0]
|
gmime = mime.split(";")[0]
|
||||||
guess = guess_extension(gmime)
|
guess = guess_extension(gmime)
|
||||||
|
|
||||||
app.logger.debug(f"extension - specified: '{ext}' - detected: "
|
app.logger.debug(f"extension - specified: '{ext}' - detected: '{guess}'")
|
||||||
f"'{guess}'")
|
|
||||||
|
|
||||||
if not ext:
|
if not ext:
|
||||||
if gmime in app.config["FHOST_EXT_OVERRIDE"]:
|
if gmime in app.config["FHOST_EXT_OVERRIDE"]:
|
||||||
|
@ -298,7 +257,7 @@ class File(db.Model):
|
||||||
|
|
||||||
return ext[:app.config["FHOST_MAX_EXT_LENGTH"]] or ".bin"
|
return ext[:app.config["FHOST_MAX_EXT_LENGTH"]] or ".bin"
|
||||||
|
|
||||||
expiration = File.get_expiration(requested_expiration, flen)
|
expiration = File.get_expiration(requested_expiration, len(data))
|
||||||
isnew = True
|
isnew = True
|
||||||
|
|
||||||
f = File.query.filter_by(sha256=digest).first()
|
f = File.query.filter_by(sha256=digest).first()
|
||||||
|
@ -329,17 +288,17 @@ class File(db.Model):
|
||||||
if isnew:
|
if isnew:
|
||||||
f.secret = None
|
f.secret = None
|
||||||
if secret:
|
if secret:
|
||||||
f.secret = \
|
f.secret = secrets.token_urlsafe(app.config["FHOST_SECRET_BYTES"])
|
||||||
secrets.token_urlsafe(app.config["FHOST_SECRET_BYTES"])
|
|
||||||
|
|
||||||
storage = Path(app.config["FHOST_STORAGE_PATH"])
|
storage = Path(app.config["FHOST_STORAGE_PATH"])
|
||||||
storage.mkdir(parents=True, exist_ok=True)
|
storage.mkdir(parents=True, exist_ok=True)
|
||||||
p = storage / digest
|
p = storage / digest
|
||||||
|
|
||||||
if not p.is_file():
|
if not p.is_file():
|
||||||
file_.save(p)
|
with open(p, "wb") as of:
|
||||||
|
of.write(data)
|
||||||
|
|
||||||
f.size = flen
|
f.size = len(data)
|
||||||
|
|
||||||
if not f.nsfw_score and app.config["NSFW_DETECT"]:
|
if not f.nsfw_score and app.config["NSFW_DETECT"]:
|
||||||
f.nsfw_score = nsfw.detect(str(p))
|
f.nsfw_score = nsfw.detect(str(p))
|
||||||
|
@ -349,127 +308,6 @@ class File(db.Model):
|
||||||
return f, isnew
|
return f, isnew
|
||||||
|
|
||||||
|
|
||||||
class RequestFilter(db.Model):
|
|
||||||
__tablename__ = "request_filter"
|
|
||||||
id = db.Column(db.Integer, primary_key=True)
|
|
||||||
type = db.Column(db.String(20), index=True, nullable=False)
|
|
||||||
comment = db.Column(db.UnicodeText)
|
|
||||||
|
|
||||||
__mapper_args__ = {
|
|
||||||
"polymorphic_on": type,
|
|
||||||
"with_polymorphic": "*",
|
|
||||||
"polymorphic_identity": "empty"
|
|
||||||
}
|
|
||||||
|
|
||||||
def __init__(self, comment: str = None):
|
|
||||||
self.comment = comment
|
|
||||||
|
|
||||||
|
|
||||||
class AddrFilter(RequestFilter):
|
|
||||||
addr = db.Column(IPAddress(16), unique=True)
|
|
||||||
|
|
||||||
__mapper_args__ = {"polymorphic_identity": "addr"}
|
|
||||||
|
|
||||||
def __init__(self, addr: ipaddress._BaseAddress, comment: str = None):
|
|
||||||
self.addr = addr
|
|
||||||
super().__init__(comment=comment)
|
|
||||||
|
|
||||||
def check(self, addr: ipaddress._BaseAddress) -> bool:
|
|
||||||
if type(addr) is ipaddress.IPv6Address:
|
|
||||||
addr = addr.ipv4_mapped or addr
|
|
||||||
return addr == self.addr
|
|
||||||
|
|
||||||
def check_request(self, r: Request) -> bool:
|
|
||||||
return self.check(ipaddress.ip_address(r.remote_addr))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def reason(self) -> str:
|
|
||||||
return f"Your IP Address ({self.addr.compressed}) is blocked from " \
|
|
||||||
"uploading files."
|
|
||||||
|
|
||||||
|
|
||||||
class IPNetwork(types.TypeDecorator):
|
|
||||||
impl = types.Text
|
|
||||||
cache_ok = True
|
|
||||||
|
|
||||||
def process_bind_param(self, value, dialect):
|
|
||||||
if value is not None:
|
|
||||||
value = value.compressed
|
|
||||||
|
|
||||||
return value
|
|
||||||
|
|
||||||
def process_result_value(self, value, dialect):
|
|
||||||
if value is not None:
|
|
||||||
value = ipaddress.ip_network(value)
|
|
||||||
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
class NetFilter(RequestFilter):
|
|
||||||
net = db.Column(IPNetwork)
|
|
||||||
|
|
||||||
__mapper_args__ = {"polymorphic_identity": "net"}
|
|
||||||
|
|
||||||
def __init__(self, net: ipaddress._BaseNetwork, comment: str = None):
|
|
||||||
self.net = net
|
|
||||||
super().__init__(comment=comment)
|
|
||||||
|
|
||||||
def check(self, addr: ipaddress._BaseAddress) -> bool:
|
|
||||||
if type(addr) is ipaddress.IPv6Address:
|
|
||||||
addr = addr.ipv4_mapped or addr
|
|
||||||
return addr in self.net
|
|
||||||
|
|
||||||
def check_request(self, r: Request) -> bool:
|
|
||||||
return self.check(ipaddress.ip_address(r.remote_addr))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def reason(self) -> str:
|
|
||||||
return f"Your network ({self.net.compressed}) is blocked from " \
|
|
||||||
"uploading files."
|
|
||||||
|
|
||||||
|
|
||||||
class HasRegex:
|
|
||||||
@declared_attr
|
|
||||||
def regex(cls):
|
|
||||||
return cls.__table__.c.get("regex", db.Column(db.UnicodeText))
|
|
||||||
|
|
||||||
def check(self, s: str) -> bool:
|
|
||||||
return re.match(self.regex, s) is not None
|
|
||||||
|
|
||||||
|
|
||||||
class MIMEFilter(HasRegex, RequestFilter):
|
|
||||||
__mapper_args__ = {"polymorphic_identity": "mime"}
|
|
||||||
|
|
||||||
def __init__(self, mime_regex: str, comment: str = None):
|
|
||||||
self.regex = mime_regex
|
|
||||||
super().__init__(comment=comment)
|
|
||||||
|
|
||||||
def check_request(self, r: Request) -> bool:
|
|
||||||
if "file" in r.files:
|
|
||||||
return self.check(r.files["file"].mimetype)
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
@property
|
|
||||||
def reason(self) -> str:
|
|
||||||
return "File MIME type not allowed."
|
|
||||||
|
|
||||||
|
|
||||||
class UAFilter(HasRegex, RequestFilter):
|
|
||||||
__mapper_args__ = {"polymorphic_identity": "ua"}
|
|
||||||
|
|
||||||
def __init__(self, ua_regex: str, comment: str = None):
|
|
||||||
self.regex = ua_regex
|
|
||||||
super().__init__(comment=comment)
|
|
||||||
|
|
||||||
def check_request(self, r: Request) -> bool:
|
|
||||||
return self.check(r.user_agent.string)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def reason(self) -> str:
|
|
||||||
return "User agent not allowed."
|
|
||||||
|
|
||||||
|
|
||||||
class UrlEncoder(object):
|
class UrlEncoder(object):
|
||||||
def __init__(self,alphabet, min_length):
|
def __init__(self,alphabet, min_length):
|
||||||
self.alphabet = alphabet
|
self.alphabet = alphabet
|
||||||
|
@ -491,21 +329,17 @@ class UrlEncoder(object):
|
||||||
result += self.alphabet.index(c) * (n ** i)
|
result += self.alphabet.index(c) * (n ** i)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
su = UrlEncoder(alphabet=app.config["URL_ALPHABET"], min_length=1)
|
su = UrlEncoder(alphabet=app.config["URL_ALPHABET"], min_length=1)
|
||||||
|
|
||||||
|
|
||||||
def fhost_url(scheme=None):
|
def fhost_url(scheme=None):
|
||||||
if not scheme:
|
if not scheme:
|
||||||
return url_for(".fhost", _external=True).rstrip("/")
|
return url_for(".fhost", _external=True).rstrip("/")
|
||||||
else:
|
else:
|
||||||
return url_for(".fhost", _external=True, _scheme=scheme).rstrip("/")
|
return url_for(".fhost", _external=True, _scheme=scheme).rstrip("/")
|
||||||
|
|
||||||
|
|
||||||
def is_fhost_url(url):
|
def is_fhost_url(url):
|
||||||
return url.startswith(fhost_url()) or url.startswith(fhost_url("https"))
|
return url.startswith(fhost_url()) or url.startswith(fhost_url("https"))
|
||||||
|
|
||||||
|
|
||||||
def shorten(url):
|
def shorten(url):
|
||||||
if len(url) > app.config["MAX_URL_LENGTH"]:
|
if len(url) > app.config["MAX_URL_LENGTH"]:
|
||||||
abort(414)
|
abort(414)
|
||||||
|
@ -517,6 +351,16 @@ def shorten(url):
|
||||||
|
|
||||||
return u.geturl()
|
return u.geturl()
|
||||||
|
|
||||||
|
def in_upload_bl(addr):
|
||||||
|
if app.config["FHOST_UPLOAD_BLACKLIST"]:
|
||||||
|
with app.open_instance_resource(app.config["FHOST_UPLOAD_BLACKLIST"], "r") as bl:
|
||||||
|
check = addr.lstrip("::ffff:")
|
||||||
|
for l in bl.readlines():
|
||||||
|
if not l.startswith("#"):
|
||||||
|
if check == l.rstrip():
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
"""
|
"""
|
||||||
requested_expiration can be:
|
requested_expiration can be:
|
||||||
|
@ -524,11 +368,13 @@ requested_expiration can be:
|
||||||
- a duration (in hours) that the file should live for
|
- a duration (in hours) that the file should live for
|
||||||
- a timestamp in epoch millis that the file should expire at
|
- a timestamp in epoch millis that the file should expire at
|
||||||
|
|
||||||
Any value greater that the longest allowed file lifespan will be rounded down
|
Any value greater that the longest allowed file lifespan will be rounded down to that
|
||||||
to that value.
|
value.
|
||||||
"""
|
"""
|
||||||
def store_file(f, requested_expiration: typing.Optional[int], addr, ua,
|
def store_file(f, requested_expiration: typing.Optional[int], addr, ua, secret: bool):
|
||||||
secret: bool):
|
if in_upload_bl(addr):
|
||||||
|
return "Your host is blocked from uploading files.\n", 451
|
||||||
|
|
||||||
sf, isnew = File.store(f, requested_expiration, addr, ua, secret)
|
sf, isnew = File.store(f, requested_expiration, addr, ua, secret)
|
||||||
|
|
||||||
response = make_response(sf.geturl())
|
response = make_response(sf.geturl())
|
||||||
|
@ -539,7 +385,6 @@ def store_file(f, requested_expiration: typing.Optional[int], addr, ua,
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
def store_url(url, addr, ua, secret: bool):
|
def store_url(url, addr, ua, secret: bool):
|
||||||
if is_fhost_url(url):
|
if is_fhost_url(url):
|
||||||
abort(400)
|
abort(400)
|
||||||
|
@ -553,14 +398,13 @@ def store_url(url, addr, ua, secret: bool):
|
||||||
return str(e) + "\n"
|
return str(e) + "\n"
|
||||||
|
|
||||||
if "content-length" in r.headers:
|
if "content-length" in r.headers:
|
||||||
length = int(r.headers["content-length"])
|
l = int(r.headers["content-length"])
|
||||||
|
|
||||||
if length <= app.config["MAX_CONTENT_LENGTH"]:
|
if l <= app.config["MAX_CONTENT_LENGTH"]:
|
||||||
def urlfile(**kwargs):
|
def urlfile(**kwargs):
|
||||||
return type('',(),kwargs)()
|
return type('',(),kwargs)()
|
||||||
|
|
||||||
f = urlfile(read=r.raw.read,
|
f = urlfile(read=r.raw.read, content_type=r.headers["content-type"], filename="")
|
||||||
content_type=r.headers["content-type"], filename="")
|
|
||||||
|
|
||||||
return store_file(f, None, addr, ua, secret)
|
return store_file(f, None, addr, ua, secret)
|
||||||
else:
|
else:
|
||||||
|
@ -568,9 +412,10 @@ def store_url(url, addr, ua, secret: bool):
|
||||||
else:
|
else:
|
||||||
abort(411)
|
abort(411)
|
||||||
|
|
||||||
|
|
||||||
def manage_file(f):
|
def manage_file(f):
|
||||||
if request.form["token"] != f.mgmt_token:
|
try:
|
||||||
|
assert(request.form["token"] == f.mgmt_token)
|
||||||
|
except:
|
||||||
abort(401)
|
abort(401)
|
||||||
|
|
||||||
if "delete" in request.form:
|
if "delete" in request.form:
|
||||||
|
@ -589,7 +434,6 @@ def manage_file(f):
|
||||||
|
|
||||||
abort(400)
|
abort(400)
|
||||||
|
|
||||||
|
|
||||||
@app.route("/<path:path>", methods=["GET", "POST"])
|
@app.route("/<path:path>", methods=["GET", "POST"])
|
||||||
@app.route("/s/<secret>/<path:path>", methods=["GET", "POST"])
|
@app.route("/s/<secret>/<path:path>", methods=["GET", "POST"])
|
||||||
def get(path, secret=None):
|
def get(path, secret=None):
|
||||||
|
@ -626,9 +470,7 @@ def get(path, secret=None):
|
||||||
response.headers["Content-Length"] = f.size
|
response.headers["Content-Length"] = f.size
|
||||||
response.headers["X-Accel-Redirect"] = "/" + str(fpath)
|
response.headers["X-Accel-Redirect"] = "/" + str(fpath)
|
||||||
else:
|
else:
|
||||||
response = send_from_directory(
|
response = send_from_directory(app.config["FHOST_STORAGE_PATH"], f.sha256, mimetype = f.mime)
|
||||||
app.config["FHOST_STORAGE_PATH"], f.sha256,
|
|
||||||
mimetype=f.mime)
|
|
||||||
|
|
||||||
response.headers["X-Expires"] = f.expiration
|
response.headers["X-Expires"] = f.expiration
|
||||||
return response
|
return response
|
||||||
|
@ -646,19 +488,11 @@ def get(path, secret=None):
|
||||||
|
|
||||||
abort(404)
|
abort(404)
|
||||||
|
|
||||||
|
|
||||||
@app.route("/", methods=["GET", "POST"])
|
@app.route("/", methods=["GET", "POST"])
|
||||||
def fhost():
|
def fhost():
|
||||||
if request.method == "POST":
|
if request.method == "POST":
|
||||||
for flt in RequestFilter.query.all():
|
|
||||||
if flt.check_request(request):
|
|
||||||
abort(403, flt.reason)
|
|
||||||
|
|
||||||
sf = None
|
sf = None
|
||||||
secret = "secret" in request.form
|
secret = "secret" in request.form
|
||||||
addr = ipaddress.ip_address(request.remote_addr)
|
|
||||||
if type(addr) is ipaddress.IPv6Address:
|
|
||||||
addr = addr.ipv4_mapped or addr
|
|
||||||
|
|
||||||
if "file" in request.files:
|
if "file" in request.files:
|
||||||
try:
|
try:
|
||||||
|
@ -666,7 +500,7 @@ def fhost():
|
||||||
return store_file(
|
return store_file(
|
||||||
request.files["file"],
|
request.files["file"],
|
||||||
int(request.form["expires"]),
|
int(request.form["expires"]),
|
||||||
addr,
|
request.remote_addr,
|
||||||
request.user_agent.string,
|
request.user_agent.string,
|
||||||
secret
|
secret
|
||||||
)
|
)
|
||||||
|
@ -678,14 +512,14 @@ def fhost():
|
||||||
return store_file(
|
return store_file(
|
||||||
request.files["file"],
|
request.files["file"],
|
||||||
None,
|
None,
|
||||||
addr,
|
request.remote_addr,
|
||||||
request.user_agent.string,
|
request.user_agent.string,
|
||||||
secret
|
secret
|
||||||
)
|
)
|
||||||
elif "url" in request.form:
|
elif "url" in request.form:
|
||||||
return store_url(
|
return store_url(
|
||||||
request.form["url"],
|
request.form["url"],
|
||||||
addr,
|
request.remote_addr,
|
||||||
request.user_agent.string,
|
request.user_agent.string,
|
||||||
secret
|
secret
|
||||||
)
|
)
|
||||||
|
@ -696,17 +530,14 @@ def fhost():
|
||||||
else:
|
else:
|
||||||
return render_template("index.html")
|
return render_template("index.html")
|
||||||
|
|
||||||
|
|
||||||
@app.route("/robots.txt")
|
@app.route("/robots.txt")
|
||||||
def robots():
|
def robots():
|
||||||
return """User-agent: *
|
return """User-agent: *
|
||||||
Disallow: /
|
Disallow: /
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
@app.errorhandler(400)
|
@app.errorhandler(400)
|
||||||
@app.errorhandler(401)
|
@app.errorhandler(401)
|
||||||
@app.errorhandler(403)
|
|
||||||
@app.errorhandler(404)
|
@app.errorhandler(404)
|
||||||
@app.errorhandler(411)
|
@app.errorhandler(411)
|
||||||
@app.errorhandler(413)
|
@app.errorhandler(413)
|
||||||
|
@ -715,23 +546,20 @@ Disallow: /
|
||||||
@app.errorhandler(451)
|
@app.errorhandler(451)
|
||||||
def ehandler(e):
|
def ehandler(e):
|
||||||
try:
|
try:
|
||||||
return render_template(f"{e.code}.html", id=id, request=request,
|
return render_template(f"{e.code}.html", id=id, request=request), e.code
|
||||||
description=e.description), e.code
|
|
||||||
except TemplateNotFound:
|
except TemplateNotFound:
|
||||||
return "Segmentation fault\n", e.code
|
return "Segmentation fault\n", e.code
|
||||||
|
|
||||||
|
|
||||||
@app.cli.command("prune")
|
@app.cli.command("prune")
|
||||||
def prune():
|
def prune():
|
||||||
"""
|
"""
|
||||||
Clean up expired files
|
Clean up expired files
|
||||||
|
|
||||||
Deletes any files from the filesystem which have hit their expiration time.
|
Deletes any files from the filesystem which have hit their expiration time. This
|
||||||
This doesn't remove them from the database, only from the filesystem.
|
doesn't remove them from the database, only from the filesystem. It's recommended
|
||||||
It is recommended that server owners run this command regularly, or set it
|
that server owners run this command regularly, or set it up on a timer.
|
||||||
up on a timer.
|
|
||||||
"""
|
"""
|
||||||
current_time = time.time() * 1000
|
current_time = time.time() * 1000;
|
||||||
|
|
||||||
# The path to where uploaded files are stored
|
# The path to where uploaded files are stored
|
||||||
storage = Path(app.config["FHOST_STORAGE_PATH"])
|
storage = Path(app.config["FHOST_STORAGE_PATH"])
|
||||||
|
@ -745,7 +573,7 @@ def prune():
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
files_removed = 0
|
files_removed = 0;
|
||||||
|
|
||||||
# For every expired file...
|
# For every expired file...
|
||||||
for file in expired_files:
|
for file in expired_files:
|
||||||
|
@ -758,33 +586,31 @@ def prune():
|
||||||
# Remove it from the file system
|
# Remove it from the file system
|
||||||
try:
|
try:
|
||||||
os.remove(file_path)
|
os.remove(file_path)
|
||||||
files_removed += 1
|
files_removed += 1;
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
pass # If the file was already gone, we're good
|
pass # If the file was already gone, we're good
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
print(e)
|
print(e)
|
||||||
print(
|
print(
|
||||||
"\n------------------------------------"
|
"\n------------------------------------"
|
||||||
"Encountered an error while trying to remove file {file_path}."
|
"Encountered an error while trying to remove file {file_path}. Double"
|
||||||
"Make sure the server is configured correctly, permissions "
|
"check to make sure the server is configured correctly, permissions are"
|
||||||
"are okay, and everything is ship shape, then try again.")
|
"okay, and everything is ship shape, then try again.")
|
||||||
return
|
return;
|
||||||
|
|
||||||
# Finally, mark that the file was removed
|
# Finally, mark that the file was removed
|
||||||
file.expiration = None
|
file.expiration = None;
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
print(f"\nDone! {files_removed} file(s) removed")
|
print(f"\nDone! {files_removed} file(s) removed")
|
||||||
|
|
||||||
|
""" For a file of a given size, determine the largest allowed lifespan of that file
|
||||||
|
|
||||||
"""
|
Based on the current app's configuration: Specifically, the MAX_CONTENT_LENGTH, as well
|
||||||
For a file of a given size, determine the largest allowed lifespan of that file
|
as FHOST_{MIN,MAX}_EXPIRATION.
|
||||||
|
|
||||||
Based on the current app's configuration:
|
This lifespan may be shortened by a user's request, but no files should be allowed to
|
||||||
Specifically, the MAX_CONTENT_LENGTH, as well as FHOST_{MIN,MAX}_EXPIRATION.
|
expire at a point after this number.
|
||||||
|
|
||||||
This lifespan may be shortened by a user's request, but no files should be
|
|
||||||
allowed to expire at a point after this number.
|
|
||||||
|
|
||||||
Value returned is a duration in milliseconds.
|
Value returned is a duration in milliseconds.
|
||||||
"""
|
"""
|
||||||
|
@ -794,13 +620,11 @@ def get_max_lifespan(filesize: int) -> int:
|
||||||
max_size = app.config.get("MAX_CONTENT_LENGTH", 256 * 1024 * 1024)
|
max_size = app.config.get("MAX_CONTENT_LENGTH", 256 * 1024 * 1024)
|
||||||
return min_exp + int((-max_exp + min_exp) * (filesize / max_size - 1) ** 3)
|
return min_exp + int((-max_exp + min_exp) * (filesize / max_size - 1) ** 3)
|
||||||
|
|
||||||
|
|
||||||
def do_vscan(f):
|
def do_vscan(f):
|
||||||
if f["path"].is_file():
|
if f["path"].is_file():
|
||||||
with open(f["path"], "rb") as scanf:
|
with open(f["path"], "rb") as scanf:
|
||||||
try:
|
try:
|
||||||
res = list(app.config["VSCAN_SOCKET"].instream(scanf).values())
|
f["result"] = list(app.config["VSCAN_SOCKET"].instream(scanf).values())[0]
|
||||||
f["result"] = res[0]
|
|
||||||
except:
|
except:
|
||||||
f["result"] = ("SCAN FAILED", None)
|
f["result"] = ("SCAN FAILED", None)
|
||||||
else:
|
else:
|
||||||
|
@ -808,12 +632,11 @@ def do_vscan(f):
|
||||||
|
|
||||||
return f
|
return f
|
||||||
|
|
||||||
|
|
||||||
@app.cli.command("vscan")
|
@app.cli.command("vscan")
|
||||||
def vscan():
|
def vscan():
|
||||||
if not app.config["VSCAN_SOCKET"]:
|
if not app.config["VSCAN_SOCKET"]:
|
||||||
print("Error: Virus scanning enabled but no connection method "
|
print("""Error: Virus scanning enabled but no connection method specified.
|
||||||
"specified.\nPlease set VSCAN_SOCKET.")
|
Please set VSCAN_SOCKET.""")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
qp = Path(app.config["VSCAN_QUARANTINE_PATH"])
|
qp = Path(app.config["VSCAN_QUARANTINE_PATH"])
|
||||||
|
@ -827,11 +650,9 @@ def vscan():
|
||||||
File.last_vscan == None),
|
File.last_vscan == None),
|
||||||
File.removed == False)
|
File.removed == False)
|
||||||
else:
|
else:
|
||||||
res = File.query.filter(File.last_vscan == None,
|
res = File.query.filter(File.last_vscan == None, File.removed == False)
|
||||||
File.removed == False)
|
|
||||||
|
|
||||||
work = [{"path": f.getpath(), "name": f.getname(), "id": f.id}
|
work = [{"path" : f.getpath(), "name" : f.getname(), "id" : f.id} for f in res]
|
||||||
for f in res]
|
|
||||||
|
|
||||||
results = []
|
results = []
|
||||||
for i, r in enumerate(p.imap_unordered(do_vscan, work)):
|
for i, r in enumerate(p.imap_unordered(do_vscan, work)):
|
||||||
|
@ -846,8 +667,7 @@ def vscan():
|
||||||
|
|
||||||
results.append({
|
results.append({
|
||||||
"id" : r["id"],
|
"id" : r["id"],
|
||||||
"last_vscan": None if r["result"][0] == "SCAN FAILED"
|
"last_vscan" : None if r["result"][0] == "SCAN FAILED" else datetime.datetime.now(),
|
||||||
else datetime.datetime.now(),
|
|
||||||
"removed" : found})
|
"removed" : found})
|
||||||
|
|
||||||
db.session.bulk_update_mappings(File, results)
|
db.session.bulk_update_mappings(File, results)
|
||||||
|
|
|
@ -139,6 +139,30 @@ FHOST_EXT_OVERRIDE = {
|
||||||
"text/x-diff" : ".diff",
|
"text/x-diff" : ".diff",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Control which files aren't allowed to be uploaded
|
||||||
|
#
|
||||||
|
# Certain kinds of files are never accepted. If the file claims to be one of
|
||||||
|
# these types of files, or if we look at the contents of the file and it looks
|
||||||
|
# like one of these filetypes, then we reject the file outright with a 415
|
||||||
|
# UNSUPPORTED MEDIA EXCEPTION
|
||||||
|
FHOST_MIME_BLACKLIST = [
|
||||||
|
"application/x-dosexec",
|
||||||
|
"application/java-archive",
|
||||||
|
"application/java-vm"
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# A list of IP addresses which are blacklisted from uploading files
|
||||||
|
#
|
||||||
|
# Can be set to the path of a file with an IP address on each line. The file
|
||||||
|
# can also include comment lines using a pound sign (#). Paths are resolved
|
||||||
|
# relative to the instance/ directory.
|
||||||
|
#
|
||||||
|
# If this is set to None, then no IP blacklist will be consulted.
|
||||||
|
FHOST_UPLOAD_BLACKLIST = None
|
||||||
|
|
||||||
|
|
||||||
# Enables support for detecting NSFW images
|
# Enables support for detecting NSFW images
|
||||||
#
|
#
|
||||||
# Consult README.md for additional dependencies before setting to True
|
# Consult README.md for additional dependencies before setting to True
|
||||||
|
|
|
@ -81,7 +81,6 @@ def run_migrations_online():
|
||||||
finally:
|
finally:
|
||||||
connection.close()
|
connection.close()
|
||||||
|
|
||||||
|
|
||||||
if context.is_offline_mode():
|
if context.is_offline_mode():
|
||||||
run_migrations_offline()
|
run_migrations_offline()
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -15,8 +15,12 @@ import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.add_column('file', sa.Column('mgmt_token', sa.String(), nullable=True))
|
op.add_column('file', sa.Column('mgmt_token', sa.String(), nullable=True))
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
def downgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.drop_column('file', 'mgmt_token')
|
op.drop_column('file', 'mgmt_token')
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
|
@ -15,11 +15,13 @@ import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
op.create_table('URL',
|
op.create_table('URL',
|
||||||
sa.Column('id', sa.Integer(), nullable=False),
|
sa.Column('id', sa.Integer(), nullable=False),
|
||||||
sa.Column('url', sa.UnicodeText(), nullable=True),
|
sa.Column('url', sa.UnicodeText(), nullable=True),
|
||||||
sa.PrimaryKeyConstraint('id'),
|
sa.PrimaryKeyConstraint('id'),
|
||||||
sa.UniqueConstraint('url'))
|
sa.UniqueConstraint('url')
|
||||||
|
)
|
||||||
op.create_table('file',
|
op.create_table('file',
|
||||||
sa.Column('id', sa.Integer(), nullable=False),
|
sa.Column('id', sa.Integer(), nullable=False),
|
||||||
sa.Column('sha256', sa.String(), nullable=True),
|
sa.Column('sha256', sa.String(), nullable=True),
|
||||||
|
@ -28,9 +30,13 @@ def upgrade():
|
||||||
sa.Column('addr', sa.UnicodeText(), nullable=True),
|
sa.Column('addr', sa.UnicodeText(), nullable=True),
|
||||||
sa.Column('removed', sa.Boolean(), nullable=True),
|
sa.Column('removed', sa.Boolean(), nullable=True),
|
||||||
sa.PrimaryKeyConstraint('id'),
|
sa.PrimaryKeyConstraint('id'),
|
||||||
sa.UniqueConstraint('sha256'))
|
sa.UniqueConstraint('sha256')
|
||||||
|
)
|
||||||
|
### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
def downgrade():
|
||||||
|
### commands auto generated by Alembic - please adjust! ###
|
||||||
op.drop_table('file')
|
op.drop_table('file')
|
||||||
op.drop_table('URL')
|
op.drop_table('URL')
|
||||||
|
### end Alembic commands ###
|
||||||
|
|
|
@ -19,7 +19,6 @@ from pathlib import Path
|
||||||
|
|
||||||
Base = automap_base()
|
Base = automap_base()
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
op.add_column('file', sa.Column('size', sa.BigInteger(), nullable=True))
|
op.add_column('file', sa.Column('size', sa.BigInteger(), nullable=True))
|
||||||
bind = op.get_bind()
|
bind = op.get_bind()
|
||||||
|
|
|
@ -1,79 +0,0 @@
|
||||||
"""Add request filters
|
|
||||||
|
|
||||||
Revision ID: 5cda1743b92d
|
|
||||||
Revises: dd0766afb7d2
|
|
||||||
Create Date: 2024-09-27 12:13:16.845981
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
|
||||||
revision = '5cda1743b92d'
|
|
||||||
down_revision = 'dd0766afb7d2'
|
|
||||||
|
|
||||||
from alembic import op
|
|
||||||
import sqlalchemy as sa
|
|
||||||
from sqlalchemy.ext.automap import automap_base
|
|
||||||
from sqlalchemy.orm import Session
|
|
||||||
from flask import current_app
|
|
||||||
import ipaddress
|
|
||||||
|
|
||||||
Base = automap_base()
|
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
|
||||||
op.create_table('request_filter',
|
|
||||||
sa.Column('id', sa.Integer(), nullable=False),
|
|
||||||
sa.Column('type', sa.String(length=20), nullable=False),
|
|
||||||
sa.Column('comment', sa.UnicodeText(), nullable=True),
|
|
||||||
sa.Column('addr', sa.LargeBinary(length=16),
|
|
||||||
nullable=True),
|
|
||||||
sa.Column('net', sa.Text(), nullable=True),
|
|
||||||
sa.Column('regex', sa.UnicodeText(), nullable=True),
|
|
||||||
sa.PrimaryKeyConstraint('id'),
|
|
||||||
sa.UniqueConstraint('addr'))
|
|
||||||
|
|
||||||
with op.batch_alter_table('request_filter', schema=None) as batch_op:
|
|
||||||
batch_op.create_index(batch_op.f('ix_request_filter_type'), ['type'],
|
|
||||||
unique=False)
|
|
||||||
|
|
||||||
bind = op.get_bind()
|
|
||||||
Base.prepare(autoload_with=bind)
|
|
||||||
RequestFilter = Base.classes.request_filter
|
|
||||||
session = Session(bind=bind)
|
|
||||||
|
|
||||||
blp = current_app.config.get("FHOST_UPLOAD_BLACKLIST")
|
|
||||||
if blp:
|
|
||||||
with current_app.open_instance_resource(blp, "r") as bl:
|
|
||||||
for line in bl.readlines():
|
|
||||||
if not line.startswith("#"):
|
|
||||||
line = line.strip()
|
|
||||||
if line.endswith(":"):
|
|
||||||
# old implementation uses str.startswith,
|
|
||||||
# which does not translate to networks
|
|
||||||
current_app.logger.warning(
|
|
||||||
f"Ignored address: {line}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
addr = ipaddress.ip_address(line).packed
|
|
||||||
flt = RequestFilter(type="addr", addr=addr)
|
|
||||||
session.add(flt)
|
|
||||||
|
|
||||||
for mime in current_app.config.get("FHOST_MIME_BLACKLIST", []):
|
|
||||||
flt = RequestFilter(type="mime", regex=mime)
|
|
||||||
session.add(flt)
|
|
||||||
|
|
||||||
session.commit()
|
|
||||||
|
|
||||||
w = "Entries in your host and MIME blacklists have been migrated to " \
|
|
||||||
"request filters and stored in the databaes, where possible. " \
|
|
||||||
"The corresponding files and config options may now be deleted. " \
|
|
||||||
"Note that you may have to manually restore them if you wish to " \
|
|
||||||
"revert this with a db downgrade operation."
|
|
||||||
current_app.logger.warning(w)
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
|
||||||
with op.batch_alter_table('request_filter', schema=None) as batch_op:
|
|
||||||
batch_op.drop_index(batch_op.f('ix_request_filter_type'))
|
|
||||||
|
|
||||||
op.drop_table('request_filter')
|
|
|
@ -15,9 +15,12 @@ import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
op.add_column('file', sa.Column('last_vscan', sa.DateTime(),
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
nullable=True))
|
op.add_column('file', sa.Column('last_vscan', sa.DateTime(), nullable=True))
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
def downgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.drop_column('file', 'last_vscan')
|
op.drop_column('file', 'last_vscan')
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
|
@ -15,8 +15,12 @@ import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.add_column('file', sa.Column('nsfw_score', sa.Float(), nullable=True))
|
op.add_column('file', sa.Column('nsfw_score', sa.Float(), nullable=True))
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
def downgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.drop_column('file', 'nsfw_score')
|
op.drop_column('file', 'nsfw_score')
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
|
@ -21,29 +21,24 @@ from sqlalchemy.orm import Session
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
""" For a file of a given size, determine the largest allowed lifespan of that file
|
||||||
|
|
||||||
"""
|
Based on the current app's configuration: Specifically, the MAX_CONTENT_LENGTH, as well
|
||||||
For a file of a given size, determine the largest allowed lifespan of that file
|
as FHOST_{MIN,MAX}_EXPIRATION.
|
||||||
|
|
||||||
Based on the current app's configuration:
|
This lifespan may be shortened by a user's request, but no files should be allowed to
|
||||||
Specifically, the MAX_CONTENT_LENGTH, as well as FHOST_{MIN,MAX}_EXPIRATION.
|
expire at a point after this number.
|
||||||
|
|
||||||
This lifespan may be shortened by a user's request, but no files should be
|
|
||||||
allowed to expire at a point after this number.
|
|
||||||
|
|
||||||
Value returned is a duration in milliseconds.
|
Value returned is a duration in milliseconds.
|
||||||
"""
|
"""
|
||||||
def get_max_lifespan(filesize: int) -> int:
|
def get_max_lifespan(filesize: int) -> int:
|
||||||
cfg = current_app.config
|
min_exp = current_app.config.get("FHOST_MIN_EXPIRATION", 30 * 24 * 60 * 60 * 1000)
|
||||||
min_exp = cfg.get("FHOST_MIN_EXPIRATION", 30 * 24 * 60 * 60 * 1000)
|
max_exp = current_app.config.get("FHOST_MAX_EXPIRATION", 365 * 24 * 60 * 60 * 1000)
|
||||||
max_exp = cfg.get("FHOST_MAX_EXPIRATION", 365 * 24 * 60 * 60 * 1000)
|
max_size = current_app.config.get("MAX_CONTENT_LENGTH", 256 * 1024 * 1024)
|
||||||
max_size = cfg.get("MAX_CONTENT_LENGTH", 256 * 1024 * 1024)
|
|
||||||
return min_exp + int((-max_exp + min_exp) * (filesize / max_size - 1) ** 3)
|
return min_exp + int((-max_exp + min_exp) * (filesize / max_size - 1) ** 3)
|
||||||
|
|
||||||
|
|
||||||
Base = automap_base()
|
Base = automap_base()
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
op.add_column('file', sa.Column('expiration', sa.BigInteger()))
|
op.add_column('file', sa.Column('expiration', sa.BigInteger()))
|
||||||
|
|
||||||
|
@ -53,7 +48,7 @@ def upgrade():
|
||||||
session = Session(bind=bind)
|
session = Session(bind=bind)
|
||||||
|
|
||||||
storage = Path(current_app.config["FHOST_STORAGE_PATH"])
|
storage = Path(current_app.config["FHOST_STORAGE_PATH"])
|
||||||
current_time = time.time() * 1000
|
current_time = time.time() * 1000;
|
||||||
|
|
||||||
# List of file hashes which have not expired yet
|
# List of file hashes which have not expired yet
|
||||||
# This could get really big for some servers
|
# This could get really big for some servers
|
||||||
|
@ -79,18 +74,13 @@ def upgrade():
|
||||||
for file in files:
|
for file in files:
|
||||||
file_path = storage / file.sha256
|
file_path = storage / file.sha256
|
||||||
stat = os.stat(file_path)
|
stat = os.stat(file_path)
|
||||||
# How long the file is allowed to live, in ms
|
max_age = get_max_lifespan(stat.st_size) # How long the file is allowed to live, in ms
|
||||||
max_age = get_max_lifespan(stat.st_size)
|
file_birth = stat.st_mtime * 1000 # When the file was created, in ms
|
||||||
# When the file was created, in ms
|
updates.append({'id': file.id, 'expiration': int(file_birth + max_age)})
|
||||||
file_birth = stat.st_mtime * 1000
|
|
||||||
updates.append({
|
|
||||||
'id': file.id,
|
|
||||||
'expiration': int(file_birth + max_age)})
|
|
||||||
|
|
||||||
# Apply coalesced updates
|
# Apply coalesced updates
|
||||||
session.bulk_update_mappings(File, updates)
|
session.bulk_update_mappings(File, updates)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
def downgrade():
|
||||||
op.drop_column('file', 'expiration')
|
op.drop_column('file', 'expiration')
|
||||||
|
|
|
@ -1,78 +0,0 @@
|
||||||
"""Change File.addr to IPAddress type
|
|
||||||
|
|
||||||
Revision ID: d9a53a28ba54
|
|
||||||
Revises: 5cda1743b92d
|
|
||||||
Create Date: 2024-09-27 14:03:06.764764
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
|
||||||
revision = 'd9a53a28ba54'
|
|
||||||
down_revision = '5cda1743b92d'
|
|
||||||
|
|
||||||
from alembic import op
|
|
||||||
import sqlalchemy as sa
|
|
||||||
from sqlalchemy.ext.automap import automap_base
|
|
||||||
from sqlalchemy.orm import Session
|
|
||||||
from flask import current_app
|
|
||||||
import ipaddress
|
|
||||||
|
|
||||||
Base = automap_base()
|
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
|
||||||
with op.batch_alter_table('file', schema=None) as batch_op:
|
|
||||||
batch_op.add_column(sa.Column('addr_tmp', sa.LargeBinary(16),
|
|
||||||
nullable=True))
|
|
||||||
|
|
||||||
bind = op.get_bind()
|
|
||||||
Base.prepare(autoload_with=bind)
|
|
||||||
File = Base.classes.file
|
|
||||||
session = Session(bind=bind)
|
|
||||||
|
|
||||||
updates = []
|
|
||||||
stmt = sa.select(File).where(sa.not_(File.addr == None))
|
|
||||||
for f in session.scalars(stmt.execution_options(yield_per=1000)):
|
|
||||||
addr = ipaddress.ip_address(f.addr)
|
|
||||||
if type(addr) is ipaddress.IPv6Address:
|
|
||||||
addr = addr.ipv4_mapped or addr
|
|
||||||
|
|
||||||
updates.append({
|
|
||||||
"id": f.id,
|
|
||||||
"addr_tmp": addr.packed
|
|
||||||
})
|
|
||||||
session.execute(sa.update(File), updates)
|
|
||||||
|
|
||||||
with op.batch_alter_table('file', schema=None) as batch_op:
|
|
||||||
batch_op.drop_column('addr')
|
|
||||||
batch_op.alter_column('addr_tmp', new_column_name='addr')
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
|
||||||
with op.batch_alter_table('file', schema=None) as batch_op:
|
|
||||||
batch_op.add_column(sa.Column('addr_tmp', sa.UnicodeText,
|
|
||||||
nullable=True))
|
|
||||||
|
|
||||||
bind = op.get_bind()
|
|
||||||
Base.prepare(autoload_with=bind)
|
|
||||||
File = Base.classes.file
|
|
||||||
session = Session(bind=bind)
|
|
||||||
|
|
||||||
updates = []
|
|
||||||
stmt = sa.select(File).where(sa.not_(File.addr == None))
|
|
||||||
for f in session.scalars(stmt.execution_options(yield_per=1000)):
|
|
||||||
addr = ipaddress.ip_address(f.addr)
|
|
||||||
if type(addr) is ipaddress.IPv6Address:
|
|
||||||
addr = addr.ipv4_mapped or addr
|
|
||||||
|
|
||||||
updates.append({
|
|
||||||
"id": f.id,
|
|
||||||
"addr_tmp": addr.compressed
|
|
||||||
})
|
|
||||||
|
|
||||||
session.execute(sa.update(File), updates)
|
|
||||||
|
|
||||||
with op.batch_alter_table('file', schema=None) as batch_op:
|
|
||||||
batch_op.drop_column('addr')
|
|
||||||
batch_op.alter_column('addr_tmp', new_column_name='addr')
|
|
||||||
|
|
|
@ -15,10 +15,16 @@ import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
with op.batch_alter_table('file', schema=None) as batch_op:
|
with op.batch_alter_table('file', schema=None) as batch_op:
|
||||||
batch_op.add_column(sa.Column('ua', sa.UnicodeText(), nullable=True))
|
batch_op.add_column(sa.Column('ua', sa.UnicodeText(), nullable=True))
|
||||||
|
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
def downgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
with op.batch_alter_table('file', schema=None) as batch_op:
|
with op.batch_alter_table('file', schema=None) as batch_op:
|
||||||
batch_op.drop_column('ua')
|
batch_op.drop_column('ua')
|
||||||
|
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
|
@ -15,8 +15,12 @@ import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.add_column('file', sa.Column('secret', sa.String(), nullable=True))
|
op.add_column('file', sa.Column('secret', sa.String(), nullable=True))
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
def downgrade():
|
def downgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.drop_column('file', 'secret')
|
op.drop_column('file', 'secret')
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
112
mod.py
112
mod.py
|
@ -11,14 +11,12 @@ from textual.screen import Screen
|
||||||
from textual import log
|
from textual import log
|
||||||
from rich.text import Text
|
from rich.text import Text
|
||||||
from jinja2.filters import do_filesizeformat
|
from jinja2.filters import do_filesizeformat
|
||||||
import ipaddress
|
|
||||||
|
|
||||||
from fhost import db, File, AddrFilter, su, app as fhost_app
|
from fhost import db, File, su, app as fhost_app, in_upload_bl
|
||||||
from modui import FileTable, mime, MpvWidget, Notification
|
from modui import *
|
||||||
|
|
||||||
fhost_app.app_context().push()
|
fhost_app.app_context().push()
|
||||||
|
|
||||||
|
|
||||||
class NullptrMod(Screen):
|
class NullptrMod(Screen):
|
||||||
BINDINGS = [
|
BINDINGS = [
|
||||||
("q", "quit_app", "Quit"),
|
("q", "quit_app", "Quit"),
|
||||||
|
@ -59,7 +57,7 @@ class NullptrMod(Screen):
|
||||||
if self.current_file:
|
if self.current_file:
|
||||||
match fcol:
|
match fcol:
|
||||||
case 1: self.finput.value = ""
|
case 1: self.finput.value = ""
|
||||||
case 2: self.finput.value = self.current_file.addr.compressed
|
case 2: self.finput.value = self.current_file.addr
|
||||||
case 3: self.finput.value = self.current_file.mime
|
case 3: self.finput.value = self.current_file.mime
|
||||||
case 4: self.finput.value = self.current_file.ext
|
case 4: self.finput.value = self.current_file.ext
|
||||||
case 5: self.finput.value = self.current_file.ua or ""
|
case 5: self.finput.value = self.current_file.ua or ""
|
||||||
|
@ -68,58 +66,49 @@ class NullptrMod(Screen):
|
||||||
self.finput.display = False
|
self.finput.display = False
|
||||||
ftable = self.query_one("#ftable")
|
ftable = self.query_one("#ftable")
|
||||||
ftable.focus()
|
ftable.focus()
|
||||||
q = ftable.base_query
|
|
||||||
|
|
||||||
if len(message.value):
|
if len(message.value):
|
||||||
match self.filter_col:
|
match self.filter_col:
|
||||||
case 1:
|
case 1:
|
||||||
try:
|
try: ftable.query = ftable.base_query.filter(File.id == su.debase(message.value))
|
||||||
q = q.filter(File.id == su.debase(message.value))
|
except ValueError: pass
|
||||||
except ValueError:
|
case 2: ftable.query = ftable.base_query.filter(File.addr.like(message.value))
|
||||||
return
|
case 3: ftable.query = ftable.base_query.filter(File.mime.like(message.value))
|
||||||
case 2:
|
case 4: ftable.query = ftable.base_query.filter(File.ext.like(message.value))
|
||||||
try:
|
case 5: ftable.query = ftable.base_query.filter(File.ua.like(message.value))
|
||||||
addr = ipaddress.ip_address(message.value)
|
else:
|
||||||
if type(addr) is ipaddress.IPv6Address:
|
ftable.query = ftable.base_query
|
||||||
addr = addr.ipv4_mapped or addr
|
|
||||||
q = q.filter(File.addr == addr)
|
|
||||||
except ValueError:
|
|
||||||
return
|
|
||||||
case 3: q = q.filter(File.mime.like(message.value))
|
|
||||||
case 4: q = q.filter(File.ext.like(message.value))
|
|
||||||
case 5: q = q.filter(File.ua.like(message.value))
|
|
||||||
|
|
||||||
ftable.query = q
|
|
||||||
|
|
||||||
def action_remove_file(self, permanent: bool) -> None:
|
def action_remove_file(self, permanent: bool) -> None:
|
||||||
if self.current_file:
|
if self.current_file:
|
||||||
self.current_file.delete(permanent)
|
self.current_file.delete(permanent)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
self.mount(Notification(f"{'Banned' if permanent else 'Removed'}"
|
self.mount(Notification(f"{'Banned' if permanent else 'Removed'} file {self.current_file.getname()}"))
|
||||||
f"file {self.current_file.getname()}"))
|
|
||||||
self.action_refresh()
|
self.action_refresh()
|
||||||
|
|
||||||
def action_ban_ip(self, nuke: bool) -> None:
|
def action_ban_ip(self, nuke: bool) -> None:
|
||||||
if self.current_file:
|
if self.current_file:
|
||||||
addr = self.current_file.addr
|
if not fhost_app.config["FHOST_UPLOAD_BLACKLIST"]:
|
||||||
if AddrFilter.query.filter(AddrFilter.addr == addr).scalar():
|
self.mount(Notification("Failed: FHOST_UPLOAD_BLACKLIST not set!"))
|
||||||
txt = f"{addr.compressed} is already banned"
|
return
|
||||||
else:
|
else:
|
||||||
db.session.add(AddrFilter(addr))
|
if in_upload_bl(self.current_file.addr):
|
||||||
db.session.commit()
|
txt = f"{self.current_file.addr} is already banned"
|
||||||
txt = f"Banned {addr.compressed}"
|
else:
|
||||||
|
with fhost_app.open_instance_resource(fhost_app.config["FHOST_UPLOAD_BLACKLIST"], "a") as bl:
|
||||||
|
print(self.current_file.addr.lstrip("::ffff:"), file=bl)
|
||||||
|
txt = f"Banned {self.current_file.addr}"
|
||||||
|
|
||||||
if nuke:
|
if nuke:
|
||||||
tsize = 0
|
tsize = 0
|
||||||
trm = 0
|
trm = 0
|
||||||
for f in File.query.filter(File.addr == addr):
|
for f in File.query.filter(File.addr == self.current_file.addr):
|
||||||
if f.getpath().is_file():
|
if f.getpath().is_file():
|
||||||
tsize += f.size or f.getpath().stat().st_size
|
tsize += f.size or f.getpath().stat().st_size
|
||||||
trm += 1
|
trm += 1
|
||||||
f.delete(True)
|
f.delete(True)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
txt += f", removed {trm} {'files' if trm != 1 else 'file'} " \
|
txt += f", removed {trm} {'files' if trm != 1 else 'file'} totaling {do_filesizeformat(tsize, True)}"
|
||||||
f"totaling {do_filesizeformat(tsize, True)}"
|
|
||||||
self.mount(Notification(txt))
|
self.mount(Notification(txt))
|
||||||
self._refresh_layout()
|
self._refresh_layout()
|
||||||
ftable = self.query_one("#ftable")
|
ftable = self.query_one("#ftable")
|
||||||
|
@ -156,14 +145,11 @@ class NullptrMod(Screen):
|
||||||
self.finput = self.query_one("#filter_input")
|
self.finput = self.query_one("#filter_input")
|
||||||
|
|
||||||
self.mimehandler = mime.MIMEHandler()
|
self.mimehandler = mime.MIMEHandler()
|
||||||
self.mimehandler.register(mime.MIMECategory.Archive,
|
self.mimehandler.register(mime.MIMECategory.Archive, self.handle_libarchive)
|
||||||
self.handle_libarchive)
|
|
||||||
self.mimehandler.register(mime.MIMECategory.Text, self.handle_text)
|
self.mimehandler.register(mime.MIMECategory.Text, self.handle_text)
|
||||||
self.mimehandler.register(mime.MIMECategory.AV, self.handle_mpv)
|
self.mimehandler.register(mime.MIMECategory.AV, self.handle_mpv)
|
||||||
self.mimehandler.register(mime.MIMECategory.Document,
|
self.mimehandler.register(mime.MIMECategory.Document, self.handle_mupdf)
|
||||||
self.handle_mupdf)
|
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_libarchive)
|
||||||
self.mimehandler.register(mime.MIMECategory.Fallback,
|
|
||||||
self.handle_libarchive)
|
|
||||||
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_mpv)
|
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_mpv)
|
||||||
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_raw)
|
self.mimehandler.register(mime.MIMECategory.Fallback, self.handle_raw)
|
||||||
|
|
||||||
|
@ -190,8 +176,7 @@ class NullptrMod(Screen):
|
||||||
self.mpvw.styles.height = "40%"
|
self.mpvw.styles.height = "40%"
|
||||||
self.mpvw.start_mpv("hex://" + imgdata, 0)
|
self.mpvw.start_mpv("hex://" + imgdata, 0)
|
||||||
|
|
||||||
self.ftlog.write(
|
self.ftlog.write(Text.from_markup(f"[bold]Pages:[/bold] {doc.page_count}"))
|
||||||
Text.from_markup(f"[bold]Pages:[/bold] {doc.page_count}"))
|
|
||||||
self.ftlog.write(Text.from_markup("[bold]Metadata:[/bold]"))
|
self.ftlog.write(Text.from_markup("[bold]Metadata:[/bold]"))
|
||||||
for k, v in doc.metadata.items():
|
for k, v in doc.metadata.items():
|
||||||
self.ftlog.write(Text.from_markup(f" [bold]{k}:[/bold] {v}"))
|
self.ftlog.write(Text.from_markup(f" [bold]{k}:[/bold] {v}"))
|
||||||
|
@ -216,8 +201,7 @@ class NullptrMod(Screen):
|
||||||
for k, v in c.metadata.items():
|
for k, v in c.metadata.items():
|
||||||
self.ftlog.write(f" {k}: {v}")
|
self.ftlog.write(f" {k}: {v}")
|
||||||
for s in c.streams:
|
for s in c.streams:
|
||||||
self.ftlog.write(
|
self.ftlog.write(Text(f"Stream {s.index}:", style="bold"))
|
||||||
Text(f"Stream {s.index}:", style="bold"))
|
|
||||||
self.ftlog.write(f" Type: {s.type}")
|
self.ftlog.write(f" Type: {s.type}")
|
||||||
if s.base_rate:
|
if s.base_rate:
|
||||||
self.ftlog.write(f" Frame rate: {s.base_rate}")
|
self.ftlog.write(f" Frame rate: {s.base_rate}")
|
||||||
|
@ -236,31 +220,24 @@ class NullptrMod(Screen):
|
||||||
else:
|
else:
|
||||||
c = chr(s)
|
c = chr(s)
|
||||||
s = c
|
s = c
|
||||||
if c.isalpha():
|
if c.isalpha(): return f"\0[chartreuse1]{s}\0[/chartreuse1]"
|
||||||
return f"\0[chartreuse1]{s}\0[/chartreuse1]"
|
if c.isdigit(): return f"\0[gold1]{s}\0[/gold1]"
|
||||||
if c.isdigit():
|
|
||||||
return f"\0[gold1]{s}\0[/gold1]"
|
|
||||||
if not c.isprintable():
|
if not c.isprintable():
|
||||||
g = "grey50" if c == "\0" else "cadet_blue"
|
g = "grey50" if c == "\0" else "cadet_blue"
|
||||||
return f"\0[{g}]{s if len(s) == 2 else '.'}\0[/{g}]"
|
return f"\0[{g}]{s if len(s) == 2 else '.'}\0[/{g}]"
|
||||||
return s
|
return s
|
||||||
|
return Text.from_markup("\n".join(f"{' '.join(map(fmt, map(''.join, zip(*[iter(c.hex())] * 2))))}"
|
||||||
return Text.from_markup(
|
f"{' ' * (16 - len(c))}"
|
||||||
"\n".join(' '.join(
|
f" {''.join(map(fmt, c))}"
|
||||||
map(fmt, map(''.join, zip(*[iter(c.hex())] * 2)))) +
|
for c in map(lambda x: bytes([n for n in x if n != None]),
|
||||||
f"{' ' * (16 - len(c))} {''.join(map(fmt, c))}"
|
zip_longest(*[iter(binf.read(min(length, 16 * 10)))] * 16))))
|
||||||
for c in
|
|
||||||
map(lambda x: bytes([n for n in x if n is not None]),
|
|
||||||
zip_longest(
|
|
||||||
*[iter(binf.read(min(length, 16 * 10)))] * 16))))
|
|
||||||
|
|
||||||
with open(self.current_file.getpath(), "rb") as binf:
|
with open(self.current_file.getpath(), "rb") as binf:
|
||||||
self.ftlog.write(hexdump(binf, self.current_file.size))
|
self.ftlog.write(hexdump(binf, self.current_file.size))
|
||||||
if self.current_file.size > 16*10*2:
|
if self.current_file.size > 16*10*2:
|
||||||
binf.seek(self.current_file.size-16*10)
|
binf.seek(self.current_file.size-16*10)
|
||||||
self.ftlog.write(" [...] ".center(64, '─'))
|
self.ftlog.write(" [...] ".center(64, '─'))
|
||||||
self.ftlog.write(hexdump(binf,
|
self.ftlog.write(hexdump(binf, self.current_file.size - binf.tell()))
|
||||||
self.current_file.size - binf.tell()))
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -271,24 +248,17 @@ class NullptrMod(Screen):
|
||||||
self.finfo.add_rows([
|
self.finfo.add_rows([
|
||||||
("ID:", str(f.id)),
|
("ID:", str(f.id)),
|
||||||
("File name:", f.getname()),
|
("File name:", f.getname()),
|
||||||
("URL:", f.geturl()
|
("URL:", f.geturl() if fhost_app.config["SERVER_NAME"] else "⚠ Set SERVER_NAME in config.py to display"),
|
||||||
if fhost_app.config["SERVER_NAME"]
|
|
||||||
else "⚠ Set SERVER_NAME in config.py to display"),
|
|
||||||
("File size:", do_filesizeformat(f.size, True)),
|
("File size:", do_filesizeformat(f.size, True)),
|
||||||
("MIME type:", f.mime),
|
("MIME type:", f.mime),
|
||||||
("SHA256 checksum:", f.sha256),
|
("SHA256 checksum:", f.sha256),
|
||||||
("Uploaded by:", Text(f.addr.compressed)),
|
("Uploaded by:", Text(f.addr)),
|
||||||
("User agent:", Text(f.ua or "")),
|
("User agent:", Text(f.ua or "")),
|
||||||
("Management token:", f.mgmt_token),
|
("Management token:", f.mgmt_token),
|
||||||
("Secret:", f.secret),
|
("Secret:", f.secret),
|
||||||
("Is NSFW:", ("Yes" if f.is_nsfw else "No") +
|
("Is NSFW:", ("Yes" if f.is_nsfw else "No") + (f" (Score: {f.nsfw_score:0.4f})" if f.nsfw_score else " (Not scanned)")),
|
||||||
(f" (Score: {f.nsfw_score:0.4f})"
|
|
||||||
if f.nsfw_score else " (Not scanned)")),
|
|
||||||
("Is banned:", "Yes" if f.removed else "No"),
|
("Is banned:", "Yes" if f.removed else "No"),
|
||||||
("Expires:",
|
("Expires:", time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(File.get_expiration(f.expiration, f.size)/1000)))
|
||||||
time.strftime("%Y-%m-%d %H:%M:%S",
|
|
||||||
time.gmtime(File.get_expiration(f.expiration,
|
|
||||||
f.size)/1000)))
|
|
||||||
])
|
])
|
||||||
|
|
||||||
self.mpvw.stop_mpv(True)
|
self.mpvw.stop_mpv(True)
|
||||||
|
@ -298,7 +268,6 @@ class NullptrMod(Screen):
|
||||||
self.mimehandler.handle(f.mime, f.ext)
|
self.mimehandler.handle(f.mime, f.ext)
|
||||||
self.ftlog.scroll_to(x=0, y=0, animate=False)
|
self.ftlog.scroll_to(x=0, y=0, animate=False)
|
||||||
|
|
||||||
|
|
||||||
class NullptrModApp(App):
|
class NullptrModApp(App):
|
||||||
CSS_PATH = "mod.css"
|
CSS_PATH = "mod.css"
|
||||||
|
|
||||||
|
@ -308,7 +277,6 @@ class NullptrModApp(App):
|
||||||
self.install_screen(self.main_screen, name="main")
|
self.install_screen(self.main_screen, name="main")
|
||||||
self.push_screen("main")
|
self.push_screen("main")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
app = NullptrModApp()
|
app = NullptrModApp()
|
||||||
app.run()
|
app.run()
|
||||||
|
|
|
@ -7,14 +7,12 @@ from jinja2.filters import do_filesizeformat
|
||||||
from fhost import File
|
from fhost import File
|
||||||
from modui import mime
|
from modui import mime
|
||||||
|
|
||||||
|
|
||||||
class FileTable(DataTable):
|
class FileTable(DataTable):
|
||||||
query = Reactive(None)
|
query = Reactive(None)
|
||||||
order_col = Reactive(0)
|
order_col = Reactive(0)
|
||||||
order_desc = Reactive(True)
|
order_desc = Reactive(True)
|
||||||
limit = 10000
|
limit = 10000
|
||||||
colmap = [File.id, File.removed, File.nsfw_score, None, File.ext,
|
colmap = [File.id, File.removed, File.nsfw_score, None, File.ext, File.size, File.mime]
|
||||||
File.size, File.mime]
|
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
|
@ -35,8 +33,6 @@ class FileTable(DataTable):
|
||||||
|
|
||||||
def watch_query(self, old, value) -> None:
|
def watch_query(self, old, value) -> None:
|
||||||
def fmt_file(f: File) -> tuple:
|
def fmt_file(f: File) -> tuple:
|
||||||
mimemoji = mime.mimemoji.get(f.mime.split('/')[0],
|
|
||||||
mime.mimemoji.get(f.mime)) or ' '
|
|
||||||
return (
|
return (
|
||||||
str(f.id),
|
str(f.id),
|
||||||
"🔴" if f.removed else " ",
|
"🔴" if f.removed else " ",
|
||||||
|
@ -44,15 +40,14 @@ class FileTable(DataTable):
|
||||||
"👻" if not f.getpath().is_file() else " ",
|
"👻" if not f.getpath().is_file() else " ",
|
||||||
f.getname(),
|
f.getname(),
|
||||||
do_filesizeformat(f.size, True),
|
do_filesizeformat(f.size, True),
|
||||||
f"{mimemoji} {f.mime}",
|
f"{mime.mimemoji.get(f.mime.split('/')[0], mime.mimemoji.get(f.mime)) or ' '} " + f.mime,
|
||||||
)
|
)
|
||||||
|
|
||||||
if (self.query):
|
if (self.query):
|
||||||
|
|
||||||
order = FileTable.colmap[self.order_col]
|
order = FileTable.colmap[self.order_col]
|
||||||
q = self.query
|
q = self.query
|
||||||
if order:
|
if order: q = q.order_by(order.desc() if self.order_desc else order, File.id)
|
||||||
q = q.order_by(order.desc() if self.order_desc
|
|
||||||
else order, File.id)
|
|
||||||
qres = list(map(fmt_file, q.limit(self.limit)))
|
qres = list(map(fmt_file, q.limit(self.limit)))
|
||||||
|
|
||||||
ri = 0
|
ri = 0
|
||||||
|
|
|
@ -34,9 +34,9 @@ mimemoji = {
|
||||||
"application/pgp-encrypted" : "🔏",
|
"application/pgp-encrypted" : "🔏",
|
||||||
}
|
}
|
||||||
|
|
||||||
MIMECategory = Enum("MIMECategory", ["Archive", "Text", "AV", "Document",
|
MIMECategory = Enum("MIMECategory",
|
||||||
"Fallback"])
|
["Archive", "Text", "AV", "Document", "Fallback"]
|
||||||
|
)
|
||||||
|
|
||||||
class MIMEHandler:
|
class MIMEHandler:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -115,14 +115,12 @@ class MIMEHandler:
|
||||||
cat = getcat(mime)
|
cat = getcat(mime)
|
||||||
for handler in self.handlers[cat][1]:
|
for handler in self.handlers[cat][1]:
|
||||||
try:
|
try:
|
||||||
if handler(cat):
|
if handler(cat): return
|
||||||
return
|
|
||||||
except: pass
|
except: pass
|
||||||
|
|
||||||
for handler in self.handlers[MIMECategory.Fallback][1]:
|
for handler in self.handlers[MIMECategory.Fallback][1]:
|
||||||
try:
|
try:
|
||||||
if handler(None):
|
if handler(None): return
|
||||||
return
|
|
||||||
except: pass
|
except: pass
|
||||||
|
|
||||||
raise RuntimeError(f"Unhandled MIME type category: {cat}")
|
raise RuntimeError(f"Unhandled MIME type category: {cat}")
|
||||||
|
|
|
@ -1,9 +1,5 @@
|
||||||
import time
|
import time
|
||||||
|
import fcntl, struct, termios
|
||||||
import fcntl
|
|
||||||
import struct
|
|
||||||
import termios
|
|
||||||
|
|
||||||
from sys import stdout
|
from sys import stdout
|
||||||
|
|
||||||
from textual import events, log
|
from textual import events, log
|
||||||
|
@ -11,7 +7,6 @@ from textual.widgets import Static
|
||||||
|
|
||||||
from fhost import app as fhost_app
|
from fhost import app as fhost_app
|
||||||
|
|
||||||
|
|
||||||
class MpvWidget(Static):
|
class MpvWidget(Static):
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
|
@ -19,10 +14,8 @@ class MpvWidget(Static):
|
||||||
self.mpv = None
|
self.mpv = None
|
||||||
self.vo = fhost_app.config.get("MOD_PREVIEW_PROTO")
|
self.vo = fhost_app.config.get("MOD_PREVIEW_PROTO")
|
||||||
|
|
||||||
if self.vo not in ["sixel", "kitty"]:
|
if not self.vo in ["sixel", "kitty"]:
|
||||||
self.update("⚠ Previews not enabled. \n\nSet MOD_PREVIEW_PROTO "
|
self.update("⚠ Previews not enabled. \n\nSet MOD_PREVIEW_PROTO to 'sixel' or 'kitty' in config.py,\nwhichever is supported by your terminal.")
|
||||||
"to 'sixel' or 'kitty' in config.py,\nwhichever is "
|
|
||||||
"supported by your terminal.")
|
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
import mpv
|
import mpv
|
||||||
|
@ -34,35 +27,28 @@ class MpvWidget(Static):
|
||||||
self.mpv[f"vo-sixel-buffered"] = True
|
self.mpv[f"vo-sixel-buffered"] = True
|
||||||
self.mpv["audio"] = False
|
self.mpv["audio"] = False
|
||||||
self.mpv["loop-file"] = "inf"
|
self.mpv["loop-file"] = "inf"
|
||||||
self.mpv["image-display-duration"] = 0.5 \
|
self.mpv["image-display-duration"] = 0.5 if self.vo == "sixel" else "inf"
|
||||||
if self.vo == "sixel" else "inf"
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.mpv = None
|
self.mpv = None
|
||||||
self.update("⚠ Previews require python-mpv with libmpv "
|
self.update(f"⚠ Previews require python-mpv with libmpv 0.36.0 or later \n\nError was:\n{type(e).__name__}: {e}")
|
||||||
"0.36.0 or later \n\nError was:\n"
|
|
||||||
f"{type(e).__name__}: {e}")
|
|
||||||
|
|
||||||
def start_mpv(self, f: str | None = None,
|
def start_mpv(self, f: str|None = None, pos: float|str|None = None) -> None:
|
||||||
pos: float | str | None = None) -> None:
|
|
||||||
self.display = True
|
self.display = True
|
||||||
self.screen._refresh_layout()
|
self.screen._refresh_layout()
|
||||||
|
|
||||||
if self.mpv:
|
if self.mpv:
|
||||||
if self.content_region.x:
|
if self.content_region.x:
|
||||||
winsz = fcntl.ioctl(0, termios.TIOCGWINSZ, '12345678')
|
r, c, w, h = struct.unpack('hhhh', fcntl.ioctl(0, termios.TIOCGWINSZ, '12345678'))
|
||||||
r, c, w, h = struct.unpack('hhhh', winsz)
|
|
||||||
width = int((w / c) * self.content_region.width)
|
width = int((w / c) * self.content_region.width)
|
||||||
height = int((h / r) * (self.content_region.height +
|
height = int((h / r) * (self.content_region.height + (1 if self.vo == "sixel" else 0)))
|
||||||
(1 if self.vo == "sixel" else 0)))
|
|
||||||
self.mpv[f"vo-{self.vo}-left"] = self.content_region.x + 1
|
self.mpv[f"vo-{self.vo}-left"] = self.content_region.x + 1
|
||||||
self.mpv[f"vo-{self.vo}-top"] = self.content_region.y + 1
|
self.mpv[f"vo-{self.vo}-top"] = self.content_region.y + 1
|
||||||
self.mpv[f"vo-{self.vo}-rows"] = self.content_region.height + \
|
self.mpv[f"vo-{self.vo}-rows"] = self.content_region.height + (1 if self.vo == "sixel" else 0)
|
||||||
(1 if self.vo == "sixel" else 0)
|
|
||||||
self.mpv[f"vo-{self.vo}-cols"] = self.content_region.width
|
self.mpv[f"vo-{self.vo}-cols"] = self.content_region.width
|
||||||
self.mpv[f"vo-{self.vo}-width"] = width
|
self.mpv[f"vo-{self.vo}-width"] = width
|
||||||
self.mpv[f"vo-{self.vo}-height"] = height
|
self.mpv[f"vo-{self.vo}-height"] = height
|
||||||
|
|
||||||
if pos is not None:
|
if pos != None:
|
||||||
self.mpv["start"] = pos
|
self.mpv["start"] = pos
|
||||||
|
|
||||||
if f:
|
if f:
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
from textual.widgets import Static
|
from textual.widgets import Static
|
||||||
|
|
||||||
|
|
||||||
class Notification(Static):
|
class Notification(Static):
|
||||||
def on_mount(self) -> None:
|
def on_mount(self) -> None:
|
||||||
self.set_timer(3, self.remove)
|
self.set_timer(3, self.remove)
|
||||||
|
|
|
@ -18,34 +18,32 @@
|
||||||
and limitations under the License.
|
and limitations under the License.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
import av
|
import av
|
||||||
from transformers import pipeline
|
from transformers import pipeline
|
||||||
|
|
||||||
|
|
||||||
class NSFWDetector:
|
class NSFWDetector:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.classifier = pipeline("image-classification",
|
self.classifier = pipeline("image-classification", model="giacomoarienti/nsfw-classifier")
|
||||||
model="giacomoarienti/nsfw-classifier")
|
|
||||||
|
|
||||||
def detect(self, fpath):
|
def detect(self, fpath):
|
||||||
try:
|
try:
|
||||||
with av.open(fpath) as container:
|
with av.open(fpath) as container:
|
||||||
try:
|
try: container.seek(int(container.duration / 2))
|
||||||
container.seek(int(container.duration / 2))
|
|
||||||
except: container.seek(0)
|
except: container.seek(0)
|
||||||
|
|
||||||
frame = next(container.decode(video=0))
|
frame = next(container.decode(video=0))
|
||||||
img = frame.to_image()
|
img = frame.to_image()
|
||||||
res = self.classifier(img)
|
res = self.classifier(img)
|
||||||
|
|
||||||
return max([x["score"] for x in res
|
return max([x["score"] for x in res if x["label"] not in ["neutral", "drawings"]])
|
||||||
if x["label"] not in ["neutral", "drawings"]])
|
|
||||||
except: pass
|
except: pass
|
||||||
|
|
||||||
return -1.0
|
return -1.0
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
n = NSFWDetector()
|
n = NSFWDetector()
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,6 @@ Jinja2
|
||||||
Flask
|
Flask
|
||||||
flask_sqlalchemy
|
flask_sqlalchemy
|
||||||
python_magic
|
python_magic
|
||||||
ipaddress
|
|
||||||
|
|
||||||
# vscan
|
# vscan
|
||||||
clamd
|
clamd
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
{{ description if description else "Your host is banned." }}
|
|
|
@ -37,6 +37,7 @@ To change the expiration date (see above):
|
||||||
|
|
||||||
{% set max_size = config["MAX_CONTENT_LENGTH"]|filesizeformat(True) %}
|
{% set max_size = config["MAX_CONTENT_LENGTH"]|filesizeformat(True) %}
|
||||||
Maximum file size: {{ max_size }}
|
Maximum file size: {{ max_size }}
|
||||||
|
Not allowed: {{ config["FHOST_MIME_BLACKLIST"]|join(", ") }}
|
||||||
|
|
||||||
|
|
||||||
FILE RETENTION PERIOD
|
FILE RETENTION PERIOD
|
||||||
|
|
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
81
tests/test_client.py
Normal file
81
tests/test_client.py
Normal file
|
@ -0,0 +1,81 @@
|
||||||
|
import pytest
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
from flask_migrate import upgrade as db_upgrade
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
from fhost import app, db, url_for, File, URL
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def client():
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{tmpdir}/db.sqlite"
|
||||||
|
app.config["FHOST_STORAGE_PATH"] = os.path.join(tmpdir, "up")
|
||||||
|
app.config["TESTING"] = True
|
||||||
|
|
||||||
|
with app.test_client() as client:
|
||||||
|
with app.app_context():
|
||||||
|
db_upgrade()
|
||||||
|
yield client
|
||||||
|
|
||||||
|
def test_client(client):
|
||||||
|
payloads = [
|
||||||
|
({ "file" : (BytesIO(b"hello"), "hello.txt") }, 200, b"https://localhost/E.txt\n"),
|
||||||
|
({ "file" : (BytesIO(b"hello"), "hello.ignorethis") }, 200, b"https://localhost/E.txt\n"),
|
||||||
|
({ "file" : (BytesIO(b"bye"), "bye.truncatethis") }, 200, b"https://localhost/Q.truncate\n"),
|
||||||
|
({ "file" : (BytesIO(b"hi"), "hi.tar.gz") }, 200, b"https://localhost/h.tar.gz\n"),
|
||||||
|
({ "file" : (BytesIO(b"lea!"), "lea!") }, 200, b"https://localhost/d.txt\n"),
|
||||||
|
({ "file" : (BytesIO(b"why?"), "balls", "application/x-dosexec") }, 415, None),
|
||||||
|
({ "shorten" : "https://0x0.st" }, 200, b"https://localhost/E\n"),
|
||||||
|
({ "shorten" : "https://localhost" }, 400, None),
|
||||||
|
({}, 400, None),
|
||||||
|
]
|
||||||
|
|
||||||
|
for p, s, r in payloads:
|
||||||
|
rv = client.post("/", buffered=True,
|
||||||
|
content_type="multipart/form-data",
|
||||||
|
data=p)
|
||||||
|
assert rv.status_code == s
|
||||||
|
if r:
|
||||||
|
assert rv.data == r
|
||||||
|
|
||||||
|
f = File.query.get(2)
|
||||||
|
f.removed = True
|
||||||
|
db.session.add(f)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
rq = [
|
||||||
|
(200, [
|
||||||
|
"/",
|
||||||
|
"robots.txt",
|
||||||
|
"E.txt",
|
||||||
|
"E.txt/test",
|
||||||
|
"E.txt/test.py",
|
||||||
|
"d.txt",
|
||||||
|
"h.tar.gz",
|
||||||
|
]),
|
||||||
|
(302, [
|
||||||
|
"E",
|
||||||
|
]),
|
||||||
|
(404, [
|
||||||
|
"test.bin",
|
||||||
|
"test.bin/test",
|
||||||
|
"test.bin/test.py",
|
||||||
|
"test",
|
||||||
|
"test/test",
|
||||||
|
"test.bin/test.py",
|
||||||
|
"E.bin",
|
||||||
|
"E/test",
|
||||||
|
"E/test.bin",
|
||||||
|
]),
|
||||||
|
(451, [
|
||||||
|
"Q.truncate",
|
||||||
|
]),
|
||||||
|
]
|
||||||
|
|
||||||
|
for code, paths in rq:
|
||||||
|
for p in paths:
|
||||||
|
app.logger.info(f"GET {p}")
|
||||||
|
rv = client.get(p)
|
||||||
|
assert rv.status_code == code
|
||||||
|
|
Loading…
Reference in a new issue