jellyfin-accounts/jellyfin_accounts/web_api.py
Harvey Tindall 2e20466925 Self-restarting for config changes
When changing settings that need restart, the option is now available to
do it automatically. Functions on linux at least, might need testing on
    windows.
2020-07-20 15:37:19 +01:00

508 lines
18 KiB
Python

# A bit of a mess, but mostly does API endpoints and a couple compatability fixes
from flask import request, jsonify
from jellyfin_accounts.jf_api import Jellyfin
import json
import datetime
import secrets
import time
import threading
import os
import sys
import psutil
from jellyfin_accounts import (
config,
config_path,
app,
g,
data_store,
resp,
configparser,
config_base_path,
)
from jellyfin_accounts.email import Mailgun, Smtp
from jellyfin_accounts import web_log as log
from jellyfin_accounts.validate_password import PasswordValidator
def format_datetime(dt):
result = dt.strftime(config["email"]["date_format"])
if config.getboolean("email", "use_24h"):
result += f' {dt.strftime("%H:%M")}'
else:
result += f' {dt.strftime("%I:%M %p")}'
return result
def checkInvite(code, used=False, username=None):
current_time = datetime.datetime.now()
invites = dict(data_store.invites)
match = False
for invite in invites:
if (
"remaining-uses" not in invites[invite]
and "no-limit" not in invites[invite]
):
invites[invite]["remaining-uses"] = 1
expiry = datetime.datetime.strptime(
invites[invite]["valid_till"], "%Y-%m-%dT%H:%M:%S.%f"
)
if current_time >= expiry or (
"no-limit" not in invites[invite] and invites[invite]["remaining-uses"] < 1
):
log.debug(f"Housekeeping: Deleting expired invite {invite}")
if (
config.getboolean("notifications", "enabled")
and "notify" in invites[invite]
):
for address in invites[invite]["notify"]:
if "notify-expiry" in invites[invite]["notify"][address]:
if invites[invite]["notify"][address]["notify-expiry"]:
method = config["email"]["method"]
if method == "mailgun":
email = Mailgun(address)
elif method == "smtp":
email = Smtp(address)
if email.construct_expiry(
{"code": invite, "expiry": expiry}
):
threading.Thread(target=email.send).start()
del data_store.invites[invite]
elif invite == code:
match = True
if used:
delete = False
inv = dict(data_store.invites[code])
if "used-by" not in inv:
inv["used-by"] = []
if "remaining-uses" in inv:
if inv["remaining-uses"] == 1:
delete = True
del data_store.invites[code]
elif "no-limit" not in invites[invite]:
inv["remaining-uses"] -= 1
inv["used-by"].append([username, format_datetime(current_time)])
if not delete:
data_store.invites[code] = inv
return match
jf = Jellyfin(
config["jellyfin"]["server"],
config["jellyfin"]["client"],
config["jellyfin"]["version"],
config["jellyfin"]["device"],
config["jellyfin"]["device_id"],
)
from jellyfin_accounts.login import auth
jf_address = config["jellyfin"]["server"]
success = False
for i in range(3):
try:
jf.authenticate(config["jellyfin"]["username"], config["jellyfin"]["password"])
success = True
log.info(f"Successfully authenticated with {jf_address}")
break
except Jellyfin.AuthenticationError:
log.error(f"Failed to authenticate with {jf_address}, Retrying...")
time.sleep(5)
if not success:
log.error("Could not authenticate after 3 tries.")
exit()
# Temporary fixes below.
def switchToIds():
try:
with open(config["files"]["emails"], "r") as f:
emails = json.load(f)
except (FileNotFoundError, json.decoder.JSONDecodeError):
emails = {}
users = jf.getUsers(public=False)
new_emails = {}
match = False
for key in emails:
for user in users:
if user["Name"] == key:
match = True
new_emails[user["Id"]] = emails[key]
elif user["Id"] == key:
new_emails[user["Id"]] = emails[key]
if match:
from pathlib import Path
email_file = Path(config["files"]["emails"]).name
log.info(
(
f"{email_file} modified to use userID instead of "
+ "usernames. These will be used in future."
)
)
emails = new_emails
with open(config["files"]["emails"], "w") as f:
f.write(json.dumps(emails, indent=4))
# Temporary, switches emails.json over from using Usernames to User IDs.
switchToIds()
from packaging import version
if (
version.parse(jf.info["Version"]) >= version.parse("10.6.0")
and bool(data_store.user_template) is not False
):
if (
data_store.user_template["AuthenticationProviderId"]
== "Emby.Server.Implementations.Library.DefaultAuthenticationProvider"
):
log.info("Updating user_template for Jellyfin >= 10.6.0")
data_store.user_template[
"AuthenticationProviderId"
] = "Jellyfin.Server.Implementations.Users.DefaultAuthenticationProvider"
if (
data_store.user_template["PasswordResetProviderId"]
== "Emby.Server.Implementations.Library.DefaultPasswordResetProvider"
):
data_store.user_template[
"PasswordResetProviderId"
] = "Jellyfin.Server.Implementations.Users.DefaultPasswordResetProvider"
def validator():
if config.getboolean("password_validation", "enabled"):
return PasswordValidator(
config["password_validation"]["min_length"],
config["password_validation"]["upper"],
config["password_validation"]["lower"],
config["password_validation"]["number"],
config["password_validation"]["special"],
)
return PasswordValidator(0, 0, 0, 0, 0)
@app.route("/newUser", methods=["POST"])
def newUser():
data = request.get_json()
log.debug("Attempted newUser")
if checkInvite(data["code"]):
validation = validator().validate(data["password"])
valid = True
for criterion in validation:
if validation[criterion] is False:
valid = False
if valid:
log.debug("User password valid")
try:
user = jf.newUser(data["username"], data["password"])
except Jellyfin.UserExistsError:
error = f'User already exists named {data["username"]}'
log.debug(error)
return jsonify({"error": error})
except:
return jsonify({"error": "Unknown error"})
invites = dict(data_store.invites)
checkInvite(data["code"], used=True, username=data["username"])
if (
config.getboolean("notifications", "enabled")
and "notify" in invites[data["code"]]
):
for address in invites[data["code"]]["notify"]:
if "notify-creation" in invites[data["code"]]["notify"][address]:
if invites[data["code"]]["notify"][address]["notify-creation"]:
method = config["email"]["method"]
if method == "mailgun":
email = Mailgun(address)
elif method == "smtp":
email = Smtp(address)
if email.construct_created(
{
"code": data["code"],
"username": data["username"],
"created": datetime.datetime.now(),
}
):
threading.Thread(target=email.send).start()
if user.status_code == 200:
try:
policy = data_store.user_template
if policy != {}:
jf.setPolicy(user.json()["Id"], policy)
else:
log.debug("user policy was blank")
except:
log.error("Failed to set new user policy")
try:
configuration = data_store.user_configuration
displayprefs = data_store.user_displayprefs
if configuration != {} and displayprefs != {}:
if jf.setConfiguration(user.json()["Id"], configuration):
jf.setDisplayPreferences(user.json()["Id"], displayprefs)
log.debug("Set homescreen layout.")
else:
log.debug("user configuration and/or displayprefs were blank")
except:
log.error("Failed to set new user homescreen layout")
if config.getboolean("password_resets", "enabled"):
data_store.emails[user.json()["Id"]] = data["email"]
log.debug("Email address stored")
log.info("New user created")
else:
log.error(f"New user creation failed: {user.status_code}")
return resp(False)
else:
log.debug("User password invalid")
return jsonify(validation)
else:
log.debug("Attempted newUser unauthorized")
return resp(False, code=401)
@app.route("/generateInvite", methods=["POST"])
@auth.login_required
def generateInvite():
current_time = datetime.datetime.now()
data = request.get_json()
delta = datetime.timedelta(
days=int(data["days"]), hours=int(data["hours"]), minutes=int(data["minutes"])
)
invite_code = secrets.token_urlsafe(16)
invite = {}
invite["created"] = format_datetime(current_time)
if data["multiple-uses"]:
if data["no-limit"]:
invite["no-limit"] = True
else:
invite["remaining-uses"] = int(data["remaining-uses"])
else:
invite["remaining-uses"] = 1
log.debug(f"Creating new invite: {invite_code}")
valid_till = current_time + delta
invite["valid_till"] = valid_till.strftime("%Y-%m-%dT%H:%M:%S.%f")
if "email" in data and config.getboolean("invite_emails", "enabled"):
address = data["email"]
invite["email"] = address
log.info(f"Sending invite to {address}")
method = config["email"]["method"]
if method == "mailgun":
from jellyfin_accounts.email import Mailgun
email = Mailgun(address)
elif method == "smtp":
from jellyfin_accounts.email import Smtp
email = Smtp(address)
email.construct_invite({"expiry": valid_till, "code": invite_code})
response = email.send()
if response is False or type(response) != bool:
invite["email"] = f"Failed to send to {address}"
if config.getboolean("notifications", "enabled"):
if "notify-creation" in data:
invite["notify-creation"] = data["notify-creation"]
if "notify-expiry" in data:
invite["notify-expiry"] = data["notify-expiry"]
data_store.invites[invite_code] = invite
log.info(f"New invite created: {invite_code}")
return resp()
@app.route("/getInvites", methods=["GET"])
@auth.login_required
def getInvites():
log.debug("Invites requested")
current_time = datetime.datetime.now()
invites = dict(data_store.invites)
for code in invites:
checkInvite(code)
invites = dict(data_store.invites)
response = {"invites": []}
for code in invites:
expiry = datetime.datetime.strptime(
invites[code]["valid_till"], "%Y-%m-%dT%H:%M:%S.%f"
)
valid_for = expiry - current_time
invite = {
"code": code,
"days": valid_for.days,
"hours": valid_for.seconds // 3600,
"minutes": (valid_for.seconds // 60) % 60,
}
if "created" in invites[code]:
invite["created"] = invites[code]["created"]
if "used-by" in invites[code]:
invite["used-by"] = invites[code]["used-by"]
if "no-limit" in invites[code]:
invite["no-limit"] = invites[code]["no-limit"]
if "remaining-uses" in invites[code]:
invite["remaining-uses"] = invites[code]["remaining-uses"]
else:
invite["remaining-uses"] = 1
if "email" in invites[code]:
invite["email"] = invites[code]["email"]
if "notify" in invites[code]:
if config.getboolean("ui", "jellyfin_login"):
address = data_store.emails[g.user.id]
else:
address = config["ui"]["email"]
if address in invites[code]["notify"]:
if "notify-expiry" in invites[code]["notify"][address]:
invite["notify-expiry"] = invites[code]["notify"][address][
"notify-expiry"
]
if "notify-creation" in invites[code]["notify"][address]:
invite["notify-creation"] = invites[code]["notify"][address][
"notify-creation"
]
response["invites"].append(invite)
return jsonify(response)
@app.route("/deleteInvite", methods=["POST"])
@auth.login_required
def deleteInvite():
code = request.get_json()["code"]
invites = dict(data_store.invites)
if code in invites:
del data_store.invites[code]
log.info(f"Invite deleted: {code}")
return resp()
@app.route("/getToken")
@auth.login_required
def get_token():
token = g.user.generate_token()
return jsonify({"token": token.decode("ascii")})
@app.route("/getUsers", methods=["GET"])
@auth.login_required
def getUsers():
log.debug("User and email list requested")
response = {"users": []}
users = jf.getUsers(public=False)
emails = data_store.emails
for user in users:
entry = {"name": user["Name"]}
if user["Id"] in emails:
entry["email"] = emails[user["Id"]]
response["users"].append(entry)
return jsonify(response)
@app.route("/modifyUsers", methods=["POST"])
@auth.login_required
def modifyUsers():
data = request.get_json()
log.debug("Email list modification requested")
for key in data:
uid = jf.getUsers(key, public=False)["Id"]
data_store.emails[uid] = data[key]
log.debug(f'Email for user "{key}" modified')
return resp()
@app.route("/setDefaults", methods=["POST"])
@auth.login_required
def setDefaults():
data = request.get_json()
username = data["username"]
log.debug(f"Storing default settings from user {username}")
try:
user = jf.getUsers(username=username, public=False)
except Jellyfin.UserNotFoundError:
log.error(f"Storing defaults failed: Couldn't find user {username}")
return resp(False)
uid = user["Id"]
policy = user["Policy"]
data_store.user_template = policy
if data["homescreen"]:
configuration = user["Configuration"]
try:
displayprefs = jf.getDisplayPreferences(uid)
data_store.user_configuration = configuration
data_store.user_displayprefs = displayprefs
except:
log.error("Storing defaults failed: " + "couldn't store homescreen layout")
return resp(False)
return resp()
@app.route("/modifyConfig", methods=["POST"])
@auth.login_required
def modifyConfig():
global config
log.info("Config modification requested")
data = request.get_json()
temp_config = configparser.RawConfigParser(
comment_prefixes="/", allow_no_value=True
)
temp_config.read(config_path)
for section in data:
if section in temp_config and 'restart-program' not in section:
for item in data[section]:
temp_config[section][item] = data[section][item]
data[section][item] = True
log.debug(f"{section}/{item} modified")
with open(config_path, "w") as config_file:
temp_config.write(config_file)
config.trigger_reload()
log.info("Config written.")
if 'restart-program' in data:
if data['restart-program']:
log.info('Restarting...')
try:
proc = psutil.Process(os.getpid())
for handler in proc.open_files() + proc.connections():
os.close(handler.fd)
except Exception as e:
log.error(f'Failed restart: {type(e).__name__}')
python = sys.executable
os.execl(python, python, *sys.argv)
return resp()
@app.route("/getConfig", methods=["GET"])
@auth.login_required
def getConfig():
log.debug("Config requested")
with open(config_base_path, "r") as f:
config_base = json.load(f)
# config.read(config_path)
response_config = config_base
for section in config_base:
for entry in config_base[section]:
if entry in config[section]:
response_config[section][entry]["value"] = config[section][entry]
return jsonify(response_config), 200
@app.route("/setNotify", methods=["POST"])
@auth.login_required
def setNotify():
data = request.get_json()
change = False
for code in data:
for key in data[code]:
if key in ["notify-expiry", "notify-creation"]:
inv = data_store.invites[code]
if config.getboolean("ui", "jellyfin_login"):
address = data_store.emails[g.user.id]
else:
address = config["ui"]["email"]
if "notify" not in inv:
inv["notify"] = {}
if address not in inv["notify"]:
inv["notify"][address] = {}
inv["notify"][address][key] = data[code][key]
log.debug(f"{code}: Notification settings changed")
change = True
if change:
data_store.invites[code] = inv
return resp()
return resp(success=False)