"""
"""
import asyncio
import logging
import types
from abc import ABCMeta
from ssl import SSLContext
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Mapping,
NoReturn,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Union,
overload,
)
import aiohttp
from aiohttp.hdrs import RETRY_AFTER as _RETRY_AFTER
from typing_extensions import Self
from yarl import URL
from aioqbt import exc
from aioqbt._compat import cached_property
from aioqbt._paramdict import ParamDict
from aioqbt.mapper import ObjectMapper
from aioqbt.version import APIVersion, ClientVersion
if TYPE_CHECKING:
from aioqbt.api import (
AppAPI,
AuthAPI,
LogAPI,
RSSAPI,
SearchAPI,
SyncAPI,
TorrentsAPI,
TransferAPI,
)
__all__ = (
"APIClient",
"APIGroup",
"create_client",
)
T = TypeVar("T")
K = TypeVar("K")
[docs]
class APIClient:
"""
Represent a remote qBittorrent client.
"""
def __init__(
self,
base_url: str,
*,
mapper: Optional[ObjectMapper] = None,
http: Optional[aiohttp.ClientSession] = None,
ssl: Optional[SSLContext] = None,
client_version: Optional[ClientVersion] = None,
api_version: Optional[APIVersion] = None,
logout_when_close: Optional[bool] = None,
logger: Optional[logging.Logger] = None,
) -> None:
if logger is None:
logger = logging.getLogger(
"%s.%s" % (type(self).__module__, type(self).__qualname__),
)
base_url = base_url.rstrip("/")
if mapper is None:
mapper = ObjectMapper()
if http is None:
http = aiohttp.ClientSession()
http_owner = True
else:
http_owner = False
context: Dict[Any, Any] = {
"mapper": mapper,
"client_version": client_version,
"api_version": api_version,
}
self._logger = logger
self._mapper = mapper
self._context = context
self._http: Optional[aiohttp.ClientSession] = http
self._http_owner = http_owner
self._ssl = ssl
self.base_url = base_url
self._retry_statuses = {
429, # Too many requests
503, # Service unavailable
502, # Bad gateway: reverse proxy may be overloaded
}
self._logout_when_close = logout_when_close
def __repr__(self) -> str:
return f"<{type(self).__name__} {self.base_url!r}>"
async def __aenter__(self) -> Self:
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[types.TracebackType],
) -> None:
await self.close()
[docs]
def is_closed(self) -> bool:
"""Tell whether client is closed"""
return self._http is None
@property
def closed(self) -> bool:
# deprecated, prefer `is_closed()` to `closed` property
return self._http is None
[docs]
async def close(self) -> None:
"""
Close client.
Release/detach resources acquired by client.
"""
if self._http is None:
return
if self._logout_when_close:
try:
await self.auth.logout()
except exc.ForbiddenError:
pass
# break cycle references to help GC.
vars_dict = vars(self)
for name in ("app", "auth", "log", "sync", "torrents", "transfer"):
if name not in vars_dict:
continue
group = getattr(self, name, None)
if isinstance(group, APIGroup):
group._close()
vars_dict.pop(name, None)
# Close if owned
if self._http_owner:
await self._http.close()
# Detach ClientSession
self._http = None
[docs]
async def request(
self,
method: str,
endpoint: str,
*,
params: Any = None,
data: Any = None,
max_attempts: int = 3,
retry_delay: float = 5,
ssl: Optional[SSLContext] = None,
raise_for_status: bool = True,
**kwargs: object,
) -> aiohttp.ClientResponse:
"""
Send an HTTP request and return a response object.
Argument ``method`` specifies the HTTP method (e.g. ``GET``)
while ``endpoint`` the API endpoint (e.g. ``torrents/info``).
``params`` forms the query string of the request URL.
``data`` is the payload in the request body.
See the underlying :meth:`ClientSession.request <aiohttp.ClientSession.request>`
for their allowed values.
``max_attempts`` is the maximum number of attempts.
Retry is performed if two additional conditions are satisfied:
* ``GET`` or ``HEAD`` requets
* Remote disconnection, or repsonse status
429 (Too many requests), 503 (Service unavailable), or 502 (Bad gateway).
The result is :class:`aiohttp.ClientResponse`, and should be used in ``async with``.
:param str method: HTTP method.
:param str endpoint: API endpoint.
:param params: parameters in query string
:param data: data in request body
:param int max_attempts: maximum number of attempts
:param float retry_delay: maximum delay between attempts
:param ssl: :class:`~ssl.SSLContext`, optional
:raise APIError: API errors (non-``200`` status).
:raise aiohttp.ClientError: connection errors
:return: :class:`~aiohttp.ClientResponse`
"""
if max_attempts <= 0:
raise ValueError(f"max_attempts <= 0: {max_attempts!r}")
elif method.upper() not in ("GET", "HEAD"):
max_attempts = 1
if self._http is None:
raise RuntimeError("closed client")
url = self.base_url + "/" + endpoint.lstrip("/")
if isinstance(params, ParamDict):
params = params.to_dict()
if isinstance(data, ParamDict):
data = data.to_dict()
if ssl is None:
ssl = self._ssl
attempt_count = 1
while True:
resp: Optional[aiohttp.ClientResponse] = None
resp_body: bytes = b""
try:
resp = await self._http.request(
method,
url,
params=params,
data=data,
ssl=ssl,
raise_for_status=False,
**kwargs,
)
if 200 <= resp.status < 300:
return resp
# treat all status outside 2xx range an error
# Read the response before release the response
resp_body = await resp.read()
resp.release()
raise aiohttp.ClientResponseError(
resp.request_info,
resp.history,
status=resp.status,
message=str(resp.reason),
headers=resp.headers,
)
except (
aiohttp.ServerDisconnectedError,
aiohttp.ClientResponseError,
) as ex:
self._logger.warning(
"Request %d/%d: %s %r",
attempt_count,
max_attempts,
method,
_real_url(url, params),
exc_info=True,
)
last_exc = ex
if attempt_count < max_attempts:
should_retry, sleeping_time = self._retry_strategy(retry_delay, last_exc, resp)
if should_retry:
attempt_count += 1
await asyncio.sleep(sleeping_time)
continue
if not raise_for_status and resp is not None:
return resp
self._handle_error(last_exc, resp, resp_body)
def _retry_strategy(
self,
retry_pause: float,
ex: BaseException,
resp: Optional[aiohttp.ClientResponse],
) -> Tuple[bool, float]:
"""
Ruturn a tuple of a bool and a sleeping time.
The bool indicate whether retry should be made.
"""
if resp is None:
# The issues are related to sockets or network connections.
if isinstance(ex, aiohttp.ServerDisconnectedError):
# retry if TCP was probably reset
return True, retry_pause
return False, 0
if resp.status not in self._retry_statuses:
return False, 0
retry_after: Optional[int] = None
if _RETRY_AFTER in resp.headers:
# the remote seems busy or in maintenance
try:
# Support second format only
retry_after = int(resp.headers[_RETRY_AFTER])
if retry_after < 0:
retry_after = None
except ValueError:
# Date format is not supported
# It usually suggests relatively long unavailability.
return False, 0
if retry_after is None:
return True, retry_pause
elif retry_after <= retry_pause:
# below our expected limit
return True, retry_after
else:
return False, 0
def _handle_error(
self,
error: Exception,
resp: Optional[aiohttp.ClientResponse],
resp_body: bytes,
) -> NoReturn:
"""
Handle errors which are not retryable.
"""
if resp is None:
# raise the last error if no resp available
raise error
try:
message = resp_body.decode("utf-8", "strict")
except UnicodeDecodeError:
message = ""
exc_class = exc._ERROR_TABLE.get(resp.status, exc.APIError)
raise exc_class.from_response(resp, message) from error
[docs]
async def request_text(
self,
method: str,
endpoint: str,
**kwargs: Any,
) -> str:
"""
Send a request and return a str.
"""
resp = await self.request(method, endpoint, **kwargs)
async with resp:
return await resp.text(encoding="utf-8")
[docs]
async def request_json(
self,
method: str,
endpoint: str,
**kwargs: Any,
) -> Any:
"""
Send a request and return a JSON-decoded object.
"""
resp = await self.request(method, endpoint, **kwargs)
async with resp:
result = await resp.json()
return result
def _create_object(self, rtype: Type[T], data: Mapping[str, Any]) -> T:
return self._mapper.create_object(rtype, data, self._context)
def _create_list(self, rtype: Type[T], data: Sequence[Mapping[str, Any]]) -> List[T]:
return self._mapper.create_list(rtype, data, self._context)
def _create_dict(self, rtype: Type[T], data: Mapping[K, Mapping[str, Any]]) -> Dict[K, T]:
return self._mapper.create_dict(rtype, data, self._context)
@property
def client_version(self) -> Optional[ClientVersion]:
"""qBittorrent client version"""
return self._context.get("client_version")
@client_version.setter
def client_version(self, version: Optional[ClientVersion]) -> None:
self._context["client_version"] = version
@property
def api_version(self) -> Optional[APIVersion]:
"""qBittorrent API version"""
return self._context.get("api_version")
@api_version.setter
def api_version(self, version: Optional[APIVersion]) -> None:
self._context["api_version"] = version
@cached_property
def app(self) -> "AppAPI":
"""
Application API methods.
"""
from aioqbt.api.app import AppAPI
return AppAPI(self)
@cached_property
def auth(self) -> "AuthAPI":
"""
Authentication API methods.
"""
from aioqbt.api.auth import AuthAPI
return AuthAPI(self)
@cached_property
def log(self) -> "LogAPI":
"""
Log API methods.
"""
from aioqbt.api.log import LogAPI
return LogAPI(self)
@cached_property
def rss(self) -> "RSSAPI":
"""
RSS API methods.
"""
from aioqbt.api.rss import RSSAPI
return RSSAPI(self)
@cached_property
def search(self) -> "SearchAPI":
"""
Search API methods.
"""
from aioqbt.api.search import SearchAPI
return SearchAPI(self)
@cached_property
def sync(self) -> "SyncAPI":
"""
Sync API methods.
"""
from aioqbt.api.sync import SyncAPI
return SyncAPI(self)
@cached_property
def torrents(self) -> "TorrentsAPI":
"""
Torrents API methods.
"""
from aioqbt.api.torrents import TorrentsAPI
return TorrentsAPI(self)
@cached_property
def transfer(self) -> "TransferAPI":
"""
Transfer API methods.
"""
from aioqbt.api.transfer import TransferAPI
return TransferAPI(self)
[docs]
class APIGroup(metaclass=ABCMeta):
"""
API group of methods.
"""
_client_ref: Optional["APIClient"] = None
def __init__(self, client: "APIClient"):
# do not keep the reference if closed
self._client_ref = None if client.is_closed() else client
def _client(self) -> "APIClient":
client = self._client_ref
if client is None:
raise RuntimeError("closed client")
return client
def _close(self) -> None:
self._client_ref = None
async def _request(
self,
method: str,
endpoint: str,
**kwargs: Any,
) -> aiohttp.ClientResponse:
return await self._client().request(method, endpoint, **kwargs)
async def _request_text(
self,
method: str,
endpoint: str,
**kwargs: Any,
) -> str:
return await self._client().request_text(method, endpoint, **kwargs)
async def _request_json(
self,
method: str,
endpoint: str,
**kwargs: Any,
) -> Any:
return await self._client().request_json(method, endpoint, **kwargs)
async def _request_mapped_object(
self,
rtype: Type[T],
method: str,
endpoint: str,
**kwargs: Any,
) -> T:
client = self._client()
result = await client.request_json(method, endpoint, **kwargs)
return client._create_object(rtype, result)
async def _request_mapped_list(
self,
rtype: Type[T],
method: str,
endpoint: str,
**kwargs: Any,
) -> List[T]:
client = self._client()
result = await client.request_json(method, endpoint, **kwargs)
return client._create_list(rtype, result)
async def _request_mapped_dict(
self,
rtype: Type[T],
method: str,
endpoint: str,
**kwargs: Any,
) -> Dict[str, T]:
client = self._client()
result = await client.request_json(method, endpoint, **kwargs)
return client._create_dict(rtype, result)
def _real_url(url: str, params: Optional[Mapping[str, Any]] = None) -> str:
url_obj = URL(url)
if params is not None:
url_obj = url_obj.with_query(params)
return str(url_obj)
[docs]
async def create_client(
url: str,
username: Optional[str] = None,
password: Optional[str] = None,
*,
logout_when_close: Optional[bool] = None,
http: Optional[aiohttp.ClientSession] = None,
ssl: Optional[SSLContext] = None,
) -> APIClient:
"""
Create :class:`APIClient`.
When both ``username`` and ``password`` are given,
the returned client will have been successfully authenticated and automatically configured.
Otherwise, :exc:`LoginError <aioqbt.exc.LoginError>` is raised.
If they are omitted, :meth:`client.auth.login() <.AuthAPI.login>` need to be called manually.
If the URL host is expresed in IP address instead of domain name,
session cookies are not preserved properly and
subsequent requests result in :exc:`~.ForbiddenError`.
See :issue:`GH-2 <2#issuecomment-1925461178>` for details.
:param str url: URL to WebUI API, for example, ``https://localhost:8080/api/v2/``
:param str username: login name
:param str password: login password
:param logout_when_close: whether logout during :meth:`~.APIClient.close`.
:param http: :class:`aiohttp.ClientSession` object
:param ssl: :class:`ssl.SSLContext` for custom TLS connections
:raises LoginError: if authentication is failed.
"""
if (username is None) != (password is None):
raise TypeError("Specify both username and password arguments, or neither of them")
if logout_when_close is None:
logout_when_close = username is not None
mapper = ObjectMapper()
client = APIClient(
base_url=url,
mapper=mapper,
http=http,
ssl=ssl,
logout_when_close=logout_when_close,
)
if username is not None:
assert isinstance(password, str)
try:
await client.auth.login(username, password)
except exc.LoginError:
await client.close()
raise
try:
client_version, api_version = await asyncio.gather(
client.app.version(),
client.app.webapi_version(),
)
except exc.ForbiddenError:
pass
else:
client.client_version = ClientVersion.parse(client_version)
client.api_version = APIVersion.parse(api_version)
return client
#
# def _find_localtime():
# from datetime import timedelta, timezone
# from time import localtime
#
# tm = localtime()
# return timezone(timedelta(seconds=tm.tm_gmtoff), tm.tm_zone)
#
#
# _LOCAL_TIMEZONE = _find_localtime()
def since(version: Union[APIVersion, Tuple[int, int, int]]) -> Callable[[T], T]:
"""
Annotate function with API version.
"""
if not isinstance(version, APIVersion):
version = APIVersion(*version)
def decorator(fn: T) -> T:
fn._api_version = version # type: ignore[attr-defined]
return fn
return decorator
@overload
def virtual(fn: None) -> Callable[[T], T]: ...
@overload
def virtual(fn: T) -> T: ...
def virtual(fn: Any = None) -> Any:
"""
Mark function not backed by endpoint.
"""
if fn is None:
return virtual # pragma: no cover
else:
return fn