jellyfin-accounts/jellyfin_accounts/login.py

121 lines
4.0 KiB
Python

#!/usr/bin/env python3
# from flask import g
from flask_httpauth import HTTPBasicAuth
from itsdangerous import (TimedJSONWebSignatureSerializer
as Serializer, BadSignature, SignatureExpired)
from passlib.apps import custom_app_context as pwd_context
import uuid
from jellyfin_accounts import config, app, g
from jellyfin_accounts import auth_log as log
from jellyfin_accounts.jf_api import Jellyfin
from jellyfin_accounts.web_api import jf
auth_jf = Jellyfin(config['jellyfin']['server'],
config['jellyfin']['client'],
config['jellyfin']['version'],
config['jellyfin']['device'],
config['jellyfin']['device_id'] + '_authClient')
class Account():
def __init__(self, username=None, password=None):
self.username = username
if password is not None:
self.password_hash = pwd_context.hash(password)
self.id = str(uuid.uuid4())
self.jf = False
elif username is not None:
jf.authenticate(config['jellyfin']['username'],
config['jellyfin']['password'])
self.id = jf.getUsers(self.username, public=False)['Id']
self.jf = True
def verify_password(self, password):
if not self.jf:
return pwd_context.verify(password, self.password_hash)
else:
try:
return auth_jf.authenticate(self.username, password)
except Jellyfin.AuthenticationError:
return False
def generate_token(self, expiration=1200):
s = Serializer(app.config['SECRET_KEY'], expires_in=expiration)
log.debug(self.id)
return s.dumps({ 'id': self.id })
@staticmethod
def verify_token(token, accounts):
log.debug(f'verifying token {token}')
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(token)
except SignatureExpired:
return None
except BadSignature:
return None
if config.getboolean('ui', 'jellyfin_login'):
for account in accounts:
if data['id'] == accounts[account].id:
return account
else:
return accounts['adminAccount']
auth = HTTPBasicAuth()
accounts = {}
if config.getboolean('ui', 'jellyfin_login'):
log.debug('Using jellyfin for admin authentication')
else:
log.debug('Using configured login details for admin authentication')
accounts['adminAccount'] = Account(config['ui']['username'],
config['ui']['password'])
@auth.verify_password
def verify_password(username, password):
user = None
verified = False
log.debug('Verifying auth')
if config.getboolean('ui', 'jellyfin_login'):
try:
jf_user = jf.getUsers(username, public=False)
id = jf_user['Id']
user = accounts[id]
except KeyError:
if config.getboolean('ui', 'admin_only'):
if jf_user['Policy']['IsAdministrator']:
user = Account(username)
accounts[id] = user
else:
log.debug(f'User {username} not admin.')
return False
else:
user = Account(username)
accounts[id] = user
except Jellyfin.UserNotFoundError:
user = Account().verify_token(username, accounts)
if user:
verified = True
if not user:
log.debug(f'User {username} not found on Jellyfin')
return False
else:
user = accounts['adminAccount']
verified = Account().verify_token(username, accounts)
if not verified:
if username == user.username and user.verify_password(password):
g.user = user
log.debug("HTTPAuth Allowed")
return True
else:
log.debug("HTTPAuth Denied")
return False
g.user = user
log.debug("HTTPAuth Allowed")
return True