mirror of
https://github.com/hrfee/jellyfin-accounts.git
synced 2025-01-03 15:00:14 +00:00
322 lines
11 KiB
Python
322 lines
11 KiB
Python
# Jellyfin API client
|
|
import requests
|
|
import time
|
|
|
|
|
|
class Error(Exception):
|
|
pass
|
|
|
|
|
|
class Jellyfin:
|
|
"""
|
|
Basic Jellyfin API client, providing account related function only.
|
|
"""
|
|
|
|
class UserExistsError(Error):
|
|
"""
|
|
Thrown if a user already exists with the same name
|
|
when creating an account.
|
|
"""
|
|
|
|
pass
|
|
|
|
class UserNotFoundError(Error):
|
|
"""Thrown if account with specified user ID/name does not exist."""
|
|
|
|
pass
|
|
|
|
class AuthenticationError(Error):
|
|
"""Thrown if authentication with Jellyfin fails."""
|
|
|
|
pass
|
|
|
|
class AuthenticationRequiredError(Error):
|
|
"""
|
|
Thrown if privileged action is attempted without authentication.
|
|
"""
|
|
|
|
pass
|
|
|
|
class UnknownError(Error):
|
|
"""
|
|
Thrown if i've been too lazy to figure out an error's meaning.
|
|
"""
|
|
|
|
pass
|
|
|
|
def __init__(self, server, client, version, device, deviceId, cacheMinutes=30):
|
|
"""
|
|
Initializes the Jellyfin object. All parameters except server
|
|
have no effect on the client's capability.
|
|
|
|
:param server: Web address of the server to connect to.
|
|
:param client: Name of the client. Appears on Jellyfin
|
|
server dashboard.
|
|
:param version: Version of the client.
|
|
:param device: Name of the device the client is running on.
|
|
:param deviceId: ID of the device the client is running on.
|
|
"""
|
|
self.server = server
|
|
self.client = client
|
|
self.version = version
|
|
self.device = device
|
|
self.deviceId = deviceId
|
|
self.authenticated = False
|
|
self.timeout = cacheMinutes * 60
|
|
self.userCacheAge = time.time() - self.timeout - 1
|
|
self.userCachePublicAge = self.userCacheAge
|
|
self.useragent = f"{self.client}/{self.version}"
|
|
self.auth = "MediaBrowser "
|
|
self.auth += f"Client={self.client}, "
|
|
self.auth += f"Device={self.device}, "
|
|
self.auth += f"DeviceId={self.deviceId}, "
|
|
self.auth += f"Version={self.version}"
|
|
self.header = {
|
|
"Accept": "application/json",
|
|
"Content-type": "application/x-www-form-urlencoded; charset=UTF-8",
|
|
"X-Application": f"{self.client}/{self.version}",
|
|
"Accept-Charset": "UTF-8,*",
|
|
"Accept-encoding": "gzip",
|
|
"User-Agent": self.useragent,
|
|
"X-Emby-Authorization": self.auth,
|
|
}
|
|
try:
|
|
self.info = requests.get(f"{self.server}/System/Info/Public").json()
|
|
except:
|
|
pass
|
|
|
|
def reloadCache(self):
|
|
""" Forces a reload of the user caches """
|
|
self.userCachePublicAge = time.time() - self.timeout - 1
|
|
self.getUsers()
|
|
try:
|
|
self.userCacheAge = self.userCachePublicAge
|
|
self.getUsers(public=False)
|
|
except self.AuthenticationRequiredError:
|
|
pass
|
|
|
|
def getUsers(self, username: str = "all", userId: str = "all", public: bool = True):
|
|
"""
|
|
Returns details on user(s), such as ID, Name, Policy.
|
|
|
|
:param username: (optional) Username to get info about.
|
|
Leave blank to get all users.
|
|
:param userId: (optional) User ID to get info about.
|
|
Leave blank to get all users.
|
|
:param public: True = Get publicly visible users only (no auth required),
|
|
False = Get all users (auth required).
|
|
"""
|
|
if public is True:
|
|
if (time.time() - self.userCachePublicAge) >= self.timeout:
|
|
response = requests.get(f"{self.server}/Users/Public").json()
|
|
self.userCachePublic = response
|
|
self.userCachePublicAge = time.time()
|
|
else:
|
|
response = self.userCachePublic
|
|
elif (
|
|
public is False and hasattr(self, "username") and hasattr(self, "password")
|
|
):
|
|
if (time.time() - self.userCacheAge) >= self.timeout:
|
|
response = requests.get(
|
|
f"{self.server}/Users",
|
|
headers=self.header,
|
|
params={"Username": self.username, "Pw": self.password},
|
|
)
|
|
if response.status_code == 200:
|
|
response = response.json()
|
|
self.userCache = response
|
|
self.userCacheAge = time.time()
|
|
elif response.status_code == 401:
|
|
try:
|
|
self.authenticate(self.username, self.password)
|
|
return self.getUsers(username, userId, public)
|
|
except self.AuthenticationError:
|
|
raise self.AuthenticationRequiredError
|
|
else:
|
|
response = self.userCache
|
|
else:
|
|
raise self.AuthenticationRequiredError
|
|
if username == "all" and userId == "all":
|
|
return response
|
|
elif userId == "all":
|
|
match = False
|
|
for user in response:
|
|
if user["Name"] == username:
|
|
match = True
|
|
return user
|
|
if not match:
|
|
raise self.UserNotFoundError
|
|
else:
|
|
match = False
|
|
for user in response:
|
|
if user["Id"] == userId:
|
|
match = True
|
|
return user
|
|
if not match:
|
|
raise self.UserNotFoundError
|
|
|
|
def authenticate(self, username: str, password: str):
|
|
"""
|
|
Authenticates by name with Jellyfin.
|
|
|
|
:param username: Plaintext username.
|
|
:param password: Plaintext password.
|
|
"""
|
|
response = requests.post(
|
|
f"{self.server}/Users/AuthenticateByName",
|
|
headers=self.header,
|
|
params={"Username": username, "Pw": password},
|
|
)
|
|
if response.status_code == 200:
|
|
json = response.json()
|
|
self.userId = json["User"]["Id"]
|
|
self.accessToken = json["AccessToken"]
|
|
self.auth = "MediaBrowser "
|
|
self.auth += f"Client={self.client}, "
|
|
self.auth += f"Device={self.device}, "
|
|
self.auth += f"DeviceId={self.deviceId}, "
|
|
self.auth += f"Version={self.version}"
|
|
self.auth += f", Token={self.accessToken}"
|
|
self.header["X-Emby-Authorization"] = self.auth
|
|
self.info = requests.get(
|
|
f"{self.server}/System/Info", headers=self.header
|
|
).json()
|
|
self.username = username
|
|
self.password = password
|
|
self.authenticated = True
|
|
return True
|
|
else:
|
|
raise self.AuthenticationError
|
|
|
|
def setPolicy(self, userId: str, policy: dict):
|
|
"""
|
|
Sets a user's policy (Admin rights, Library Access, etc.) by user ID.
|
|
|
|
:param userId: ID of the user to modify.
|
|
:param policy: User policy in dictionary form.
|
|
"""
|
|
return requests.post(
|
|
f"{self.server}/Users/" + userId + "/Policy",
|
|
headers=self.header,
|
|
params=policy,
|
|
)
|
|
|
|
def newUser(self, username: str, password: str):
|
|
for user in self.getUsers(public=False):
|
|
if user["Name"] == username:
|
|
raise self.UserExistsError
|
|
response = requests.post(
|
|
f"{self.server}/Users/New",
|
|
headers=self.header,
|
|
params={"Name": username, "Password": password},
|
|
)
|
|
if response.status_code == 401:
|
|
if hasattr(self, "username") and hasattr(self, "password"):
|
|
self.authenticate(self.username, self.password)
|
|
return self.newUser(username, password)
|
|
else:
|
|
raise self.AuthenticationRequiredError
|
|
return response
|
|
|
|
def getViewOrder(self, userId: str, public: bool = True):
|
|
if not public:
|
|
param = "?IncludeHidden=true"
|
|
else:
|
|
param = ""
|
|
views = requests.get(
|
|
f"{self.server}/Users/" + userId + "/Views" + param, headers=self.header
|
|
).json()["Items"]
|
|
orderedViews = []
|
|
for library in views:
|
|
orderedViews.append(library["Id"])
|
|
return orderedViews
|
|
|
|
def setConfiguration(self, userId: str, configuration: dict):
|
|
"""
|
|
Sets a user's configuration (Settings the user can change themselves).
|
|
:param userId: ID of the user to modify.
|
|
:param configuration: Configuration to write in dictionary form.
|
|
"""
|
|
resp = requests.post(
|
|
f"{self.server}/Users/" + userId + "/Configuration",
|
|
headers=self.header,
|
|
params=configuration,
|
|
)
|
|
if resp.status_code == 200 or resp.status_code == 204:
|
|
return True
|
|
elif resp.status_code == 401:
|
|
if hasattr(self, "username") and hasattr(self, "password"):
|
|
self.authenticate(self.username, self.password)
|
|
return self.setConfiguration(userId, configuration)
|
|
else:
|
|
raise self.AuthenticationRequiredError
|
|
else:
|
|
raise self.UnknownError
|
|
|
|
def getConfiguration(self, username: str = "all", userId: str = "all"):
|
|
"""
|
|
Gets a user's Configuration. This can also be found in getUsers if
|
|
public is set to False.
|
|
:param username: The user's username.
|
|
:param userId: The user's ID.
|
|
"""
|
|
return self.getUsers(username=username, userId=userId, public=False)[
|
|
"Configuration"
|
|
]
|
|
|
|
def getDisplayPreferences(self, userId: str):
|
|
"""
|
|
Gets a user's Display Preferences (Home layout).
|
|
:param userId: The user's ID.
|
|
"""
|
|
resp = requests.get(
|
|
(
|
|
self.server
|
|
+ "/DisplayPreferences/usersettings"
|
|
+ "?userId="
|
|
+ userId
|
|
+ "&client=emby"
|
|
),
|
|
headers=self.header,
|
|
)
|
|
if resp.status_code == 200:
|
|
return resp.json()
|
|
elif resp.status_code == 401:
|
|
if hasattr(self, "username") and hasattr(self, "password"):
|
|
self.authenticate(self.username, self.password)
|
|
return self.getDisplayPreferences(userId)
|
|
else:
|
|
raise self.AuthenticationRequiredError
|
|
else:
|
|
raise self.UnknownError
|
|
|
|
def setDisplayPreferences(self, userId: str, preferences: dict):
|
|
"""
|
|
Sets a user's Display Preferences (Home layout).
|
|
:param userId: The user's ID.
|
|
:param preferences: The preferences to set.
|
|
"""
|
|
tempheader = self.header
|
|
tempheader["Content-type"] = "application/json"
|
|
resp = requests.post(
|
|
(
|
|
self.server
|
|
+ "/DisplayPreferences/usersettings"
|
|
+ "?userId="
|
|
+ userId
|
|
+ "&client=emby"
|
|
),
|
|
headers=tempheader,
|
|
json=preferences,
|
|
)
|
|
if resp.status_code == 200 or resp.status_code == 204:
|
|
return True
|
|
elif resp.status_code == 401:
|
|
if hasattr(self, "username") and hasattr(self, "password"):
|
|
self.authenticate(self.username, self.password)
|
|
return self.setDisplayPreferences(userId, preferences)
|
|
else:
|
|
raise self.AuthenticationRequiredError
|
|
else:
|
|
return resp
|