import dataclasses
import os
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Union, overload
import aiohttp
from typing_extensions import Self
from aioqbt import exc
from aioqbt._decorator import copy_self
from aioqbt._paramdict import ParamDict
from aioqbt.api.types import (
Category,
ContentLayout,
FileEntry,
InfoFilter,
RatioLimits,
SeedingTimeLimits,
ShareLimitAction,
StopCondition,
TorrentInfo,
TorrentProperties,
TorrentSSLParameters,
Tracker,
WebSeed,
)
from aioqbt.bittorrent import InfoHash, InfoHashes, InfoHashesOrAll, _info_hash_str
from aioqbt.chrono import Minutes, TimeUnit
from aioqbt.client import APIClient, APIGroup, since, virtual
from aioqbt.typing import StrPath
from aioqbt.version import APIVersion, ClientVersion
__all__ = (
"AddFormBuilder",
"TorrentsAPI",
)
def _check_iterable_except_str(param: str, value: Iterable[Any]) -> None:
"""Explicitly reject ``str`` as ``Iterable[str]``"""
if isinstance(value, str): # pragma: no cover
raise ValueError(f"{param!r} refused str as iterable")
def _adapt_info_filter(
filter: str,
api_version: Optional[Tuple[int, int, int]],
) -> str:
"""
Normalize info filter across qBittorrent versions
and issue warnings about it
"""
# Before API v2.11.0,
# RUNNING was called RESUMED
# STOPPED was called PAUSED
msg: Optional[str] = None
if APIVersion.compare(api_version, (2, 11, 0)) >= 0:
if filter == InfoFilter.RESUMED:
msg = "Please migrate RESUMED to RUNNING in qBittorrent v5"
filter = InfoFilter.RUNNING
elif filter == InfoFilter.PAUSED:
msg = "Please migrate PAUSED to STOPPED in qBittorrent v5"
filter = InfoFilter.STOPPED
else:
if filter == InfoFilter.RUNNING:
filter = InfoFilter.RESUMED
if filter == InfoFilter.STOPPED:
filter = InfoFilter.PAUSED
if msg is not None:
import warnings
warnings.warn(msg, DeprecationWarning, stacklevel=3)
return filter
[docs]
class TorrentsAPI(APIGroup):
"""
API methods under ``torrents``.
"""
[docs]
async def count(self) -> int:
"""
Get the number of torrents
"""
# new in v4.6.1
result = await self._request_text(
"GET",
"torrents/count",
)
return int(result)
[docs]
async def info(
self,
filter: Optional[str] = None,
category: Optional[str] = None,
sort: Optional[str] = None,
reverse: Optional[bool] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
hashes: Optional[InfoHashesOrAll] = None,
tag: Optional[str] = None, # since API v2.8.3
) -> List[TorrentInfo]:
"""
Get a list of :class:`.TorrentInfo`.
To obtain a list of completed torrents sorted by name::
torrents = await client.torrents.info(
filter=InfoFilter.COMPLETED,
sort="name",
)
See also :APIWiki:`torrents/info <#get-torrent-list>`
for filter and result meanings.
:param filter: State filter: :class:`.InfoFilter` or ``str``.
:param category: category filter.
:param sort: Sort results by an attribute/field.
:param reverse: Reverse the results.
:param limit: Maximum number of returned results.
:param offset: Results starting from the ``offset``-th torrents.
:param hashes: A list of info hashes, or a str ``all``.
:param tag: Tag filter.
"""
if filter is not None:
filter = _adapt_info_filter(filter, self._client().api_version)
filter = str(filter)
if hashes is None:
params = ParamDict()
else:
params = ParamDict.with_hashes(hashes)
params.optional_str("filter", filter)
params.optional_str("category", category)
params.optional_str("sort", sort)
params.optional_bool("reverse", reverse)
params.optional_int("limit", limit)
params.optional_int("offset", offset)
if tag is not None:
# API 2.8.3
params.optional_str("tag", tag)
return await self._request_mapped_list(
TorrentInfo,
"GET",
"torrents/info",
params=params,
)
[docs]
async def properties(self, hash: InfoHash) -> TorrentProperties:
"""Get properties of a torrent."""
props = await self._request_mapped_object(
TorrentProperties,
"GET",
"torrents/properties",
params=ParamDict.with_hash(hash),
)
if not hasattr(props, "hash"):
props.hash = _info_hash_str(hash)
return props
[docs]
async def trackers(self, hash: InfoHash) -> List[Tracker]:
"""Trackers in a torrent."""
# Tracker's status may be a string or int, API v2.2.0
return await self._request_mapped_list(
Tracker,
"GET",
"torrents/trackers",
params=ParamDict.with_hash(hash),
)
[docs]
async def webseeds(self, hash: InfoHash) -> List[WebSeed]:
"""Web seeds in a torrent."""
return await self._request_mapped_list(
WebSeed,
"GET",
"torrents/webseeds",
params=ParamDict.with_hash(hash),
)
[docs]
async def files(
self,
hash: InfoHash,
indexes: Optional[Iterable[int]] = None,
) -> List[FileEntry]:
"""Files in a torrent."""
params = ParamDict.with_hash(hash)
if indexes is not None:
# API 2.8.2
params.optional_list("indexes", indexes, "|")
return await self._request_mapped_list(
FileEntry,
"GET",
"torrents/files",
params=params,
)
[docs]
async def piece_states(self, hash: InfoHash) -> List[int]:
"""
A list of piece states in a torrent.
To compare results, use following constants from :class:`~.PieceState` enum:
* :attr:`.PieceState.UNAVAILABLE`
* :attr:`.PieceState.DOWNLOADING`
* :attr:`.PieceState.DOWNLOADED`
"""
return await self._request_json( # type: ignore[no-any-return]
"GET",
"torrents/pieceStates",
params=ParamDict.with_hash(hash),
)
[docs]
async def piece_hashes(self, hash: InfoHash) -> List[str]:
"""
A list of piece hashes in a torrent.
"""
return await self._request_json( # type: ignore[no-any-return]
"GET",
"torrents/pieceHashes",
params=ParamDict.with_hash(hash),
)
[docs]
async def stop(self, hashes: InfoHashesOrAll) -> None:
"""
Stop torrents.
Torrents can be specified by their info hashes.
Passing ``all`` stops all torrents.
"""
client = self._client()
if APIVersion.compare(client.api_version, (2, 11, 0)) >= 0:
endpoint = "torrents/stop"
else:
endpoint = "torrents/pause"
await self._request_text(
"POST",
endpoint,
data=ParamDict.with_hashes_or_all(hashes),
)
[docs]
async def pause(self, hashes: InfoHashesOrAll) -> None:
"""Alias of :meth:`.stop`."""
return await self.stop(hashes)
[docs]
async def start(self, hashes: InfoHashesOrAll) -> None:
"""
Start torrents.
Torrents can be specified by their info hashes.
Passing ``all`` starts all torrents.
"""
client = self._client()
if APIVersion.compare(client.api_version, (2, 11, 0)) >= 0:
endpoint = "torrents/start"
else:
endpoint = "torrents/resume"
await self._request_text(
"POST",
endpoint,
data=ParamDict.with_hashes_or_all(hashes),
)
[docs]
async def resume(self, hashes: InfoHashesOrAll) -> None:
"""Alias of :meth:`.start`."""
return await self.start(hashes)
[docs]
async def delete(self, hashes: InfoHashesOrAll, delete_files: bool) -> None:
"""
Delete torrents.
Torrents can be specified by their info hashes.
Passing ``all`` deletes all torrents.
Pass ``True`` to ``delete_files`` to remove downloaded content.
"""
if hashes != "all":
_check_iterable_except_str("hashes", hashes)
data = ParamDict.with_hashes_or_all(hashes)
data.required_bool("deleteFiles", delete_files) # default to False
await self._request_text(
"POST",
"torrents/delete",
data=data,
)
[docs]
async def recheck(self, hashes: InfoHashesOrAll) -> None:
"""Recheck torrents."""
await self._request_text(
"POST",
"torrents/recheck",
data=ParamDict.with_hashes_or_all(hashes),
)
[docs]
async def reannounce(self, hashes: InfoHashesOrAll) -> None:
"""Reannounce torrents."""
# since API v2.0.2
await self._request_text(
"POST",
"torrents/reannounce",
data=ParamDict.with_hashes_or_all(hashes),
)
[docs]
async def add(self, form: aiohttp.FormData) -> None:
"""
Add torrents by URLs, info hashes, and/or file blobs.
See :class:`.AddFormBuilder` on how to configure and build
:class:`~aiohttp.FormData` to submit.
.. note::
:exc:`~.exc.AddTorrentError` may raise if no *new* torrents
are added.
:param form: form data to submit.
"""
resp = await self._request(
"POST",
"torrents/add",
data=form,
raise_for_status=False,
)
async with resp:
try:
# body should have been cached in the aiohttp response
body = await resp.read()
except aiohttp.ClientConnectionError:
body = b""
# before 5.2.0, the response is status of 200 and a content of either "Ok." or "Fails."
if not 200 <= resp.status < 300 or (resp.status == 200 and body == b"Fails."):
ex = exc.AddTorrentError.from_response(resp)
ex.message = body.decode("utf-8")
raise ex
# TODO Support the add result since v5.2.0
[docs]
async def add_trackers(self, hash: InfoHash, trackers: Iterable[str]) -> None:
_check_iterable_except_str("trackers", trackers)
data = ParamDict.with_hash(hash)
data.required_list("urls", trackers, "\n")
await self._request_text(
"POST",
"torrents/addTrackers",
data=data,
)
@overload
async def edit_tracker(self, hash: InfoHash, url: str, new_url: str) -> None: ...
@overload
async def edit_tracker(
self,
hash: InfoHash,
url: str,
new_url: Optional[str] = None,
tier: Optional[int] = None,
) -> None: ...
@overload
async def edit_tracker(self, hash: InfoHash, *, orig_url: str, new_url: str) -> None: ...
[docs]
@since((2, 2, 0))
async def edit_tracker(
self,
hash: InfoHash,
url: Optional[str] = None,
new_url: Optional[str] = None,
tier: Optional[int] = None,
*,
orig_url: Optional[str] = None,
) -> None:
if url is None and orig_url is None:
raise TypeError("url must be provided")
elif url is None:
# orig_url was renamed to url in API v2.13.0
url = orig_url
if APIVersion.compare(self._client().api_version, (2, 13, 0)) < 0:
data = ParamDict.with_hash(hash)
data.required_str("origUrl", url) # type: ignore
data.optional_str("newUrl", new_url)
else:
data = ParamDict.with_hash(hash)
data.required_str("url", url) # type: ignore
data.optional_str("newUrl", new_url)
data.optional_int("tier", tier)
await self._request_text(
"POST",
"torrents/editTracker",
data=data,
)
[docs]
async def remove_trackers(self, hash: InfoHash, urls: Iterable[str]) -> None:
_check_iterable_except_str("urls", urls)
# Since API v2.2.0
data = ParamDict.with_hash(hash)
data.required_list("urls", urls, "|")
await self._request_text(
"POST",
"torrents/removeTrackers",
data=data,
)
[docs]
async def add_peers(self, hashes: InfoHashes, peers: Iterable[str]) -> None:
_check_iterable_except_str("peers", peers)
data = ParamDict.with_hashes(hashes)
data.required_list("peers", peers, "|")
await self._request_text(
"POST",
"torrents/addPeers",
data=data,
)
[docs]
async def top_prio(self, hashes: InfoHashesOrAll) -> None:
await self._request_text(
"POST",
"torrents/topPrio",
data=ParamDict.with_hashes_or_all(hashes),
)
[docs]
async def bottom_prio(self, hashes: InfoHashesOrAll) -> None:
await self._request_text(
"POST",
"torrents/bottomPrio",
data=ParamDict.with_hashes_or_all(hashes),
)
[docs]
async def increase_prio(self, hashes: InfoHashesOrAll) -> None:
await self._request_text(
"POST",
"torrents/increasePrio",
data=ParamDict.with_hashes_or_all(hashes),
)
[docs]
async def decrease_prio(self, hashes: InfoHashesOrAll) -> None:
await self._request_text(
"POST",
"torrents/decreasePrio",
data=ParamDict.with_hashes_or_all(hashes),
)
[docs]
async def file_prio(self, hash: InfoHash, id: Iterable[int], priority: int) -> None:
"""
Prioritize files in a torrent.
:param hash: Info hash
:param id: A list of file indices to prioritize.
:param priority: Priority, :class:`.FilePriority`.
"""
# id may be a list since API v2.2.0
if isinstance(id, int):
id = (id,)
data = ParamDict.with_hash(hash)
data.required_list("id", id, "|")
data.required_int("priority", priority)
await self._request_text(
"POST",
"torrents/filePrio",
data=data,
)
[docs]
async def download_limit(self, hashes: InfoHashesOrAll) -> Dict[str, int]:
"""
Get torrent download limits.
The result is a dict mapping info hash to download speed limit
in bytes/second.
:param hashes: A list of info hashes or ``all`` for all torrents.
"""
params = ParamDict.with_hashes_or_all(hashes)
result = await self._request_json(
"GET",
"torrents/downloadLimit",
params=params,
)
return result # type: ignore[no-any-return]
[docs]
async def set_download_limit(self, hashes: InfoHashesOrAll, limit: int) -> None:
"""
Update torrent download limits.
:param hashes: A list of info hashes or ``all`` for all torrents.
:param limit: Download limit in bytes/second.
"""
data = ParamDict.with_hashes_or_all(hashes)
data.required_int("limit", limit)
await self._request_text(
"POST",
"torrents/setDownloadLimit",
data=data,
)
[docs]
async def set_share_limits(
self,
hashes: InfoHashesOrAll,
ratio_limit: Union[float, RatioLimits],
seeding_time_limit: Union[timedelta, int, SeedingTimeLimits],
inactive_seeding_time_limit: Union[
timedelta, int, SeedingTimeLimits, None
] = None,
share_limit_action: Union[str, ShareLimitAction, None] = None,
) -> None:
"""
Set share limits for torrents.
:param hashes: A list of info hashes or ``all`` for all torrents.
:param ratio_limit: A number or :attr:`.RatioLimits.UNSET`.
:param seeding_time_limit: :class:`~datetime.timedelta`, or
:class:`.SeedingTimeLimits` constants.
:param inactive_seeding_time_limit: :class:`~datetime.timedelta`, or
:class:`.InactiveSeedingTimeLimits` constants.
Required since qBittorrent v4.6.0 (API 2.9.2).
:param share_limit_action: a str, or
:class:`.ShareLimitAction` constants.
Required since qBittorrent v5.2.0 (API 2.12.0).
"""
# since API v2.0.1
client = self._client()
data = ParamDict.with_hashes_or_all(hashes)
data.required_float("ratioLimit", ratio_limit)
data.required_duration("seedingTimeLimit", seeding_time_limit, TimeUnit.MINUTES)
if inactive_seeding_time_limit is not None:
data.required_duration(
"inactiveSeedingTimeLimit", inactive_seeding_time_limit, TimeUnit.MINUTES
)
if share_limit_action is not None:
data.required_str("shareLimitAction", str(share_limit_action))
try:
await client.request_text(
"POST",
"torrents/setShareLimits",
data=data,
)
except exc.BadRequestError as ex:
if (
inactive_seeding_time_limit is None
and APIVersion.compare(client.api_version, (2, 9, 2)) >= 0
):
note = "Argument 'inactive_seeding_time_limit' is required since qBittorrent 4.6.0"
exc._add_note(ex, note, logger=client._logger)
if (
share_limit_action is None
and APIVersion.compare(client.api_version, (2, 12, 0)) >= 0
):
note = "Argument 'share_limit_action' is required since qBittorrent 5.2.0"
exc._add_note(ex, note, logger=client._logger)
raise
[docs]
async def upload_limit(self, hashes: InfoHashesOrAll) -> Dict[str, int]:
"""
Get torrent upload limits.
The result is a dict mapping info hash to upload speed limit
in bytes/second.
:param hashes: A list of info hashes or ``all`` for all torrents.
"""
params = ParamDict.with_hashes_or_all(hashes)
return await self._request_json( # type: ignore[no-any-return]
"GET",
"torrents/uploadLimit",
params=params,
)
[docs]
async def set_upload_limit(self, hashes: InfoHashesOrAll, limit: int) -> None:
"""
Update torrent upload limits.
:param hashes: A list of info hashes or ``all`` for all torrents.
:param limit: Upload limit in bytes/second.
"""
data = ParamDict.with_hashes_or_all(hashes)
data.required_int("limit", limit)
await self._request_text(
"POST",
"torrents/setUploadLimit",
data=data,
)
[docs]
async def set_location(
self,
hashes: InfoHashesOrAll,
location: StrPath,
) -> None:
"""
Change location (save path) for torrents.
This method also turns off auto torrent management (AutoTMM)
for torrents.
See also :meth:`~.set_save_path`.
:param hashes: A list of info hashes or ``all`` for all torrents.
:param location: Location.
"""
data = ParamDict.with_hashes_or_all(hashes)
data.required_path("location", location)
await self._request_text(
"POST",
"torrents/setLocation",
data=data,
)
[docs]
async def set_save_path(
self,
id: InfoHashesOrAll,
path: StrPath,
) -> None:
"""
Change save path (location) for torrents.
This method causes no effect to torrents with auto torrent
management (AutoTMM) enabled.
Available since qBittorrent v4.4.0.
See also :meth:`~.set_location`.
:param id: A list of info hashes or ``all`` for all torrents.
:param path: Save path.
"""
# since API v2.8.4
data = ParamDict.with_hashes_or_all(id, key="id")
data.required_path("path", path)
await self._request_text(
"POST",
"torrents/setSavePath",
data=data,
)
[docs]
async def set_download_path(
self,
id: InfoHashesOrAll,
path: StrPath,
) -> None:
"""
Change download path for torrents.
Available since qBittorrent v4.4.0.
:param id: A list of info hashes or ``all`` for all torrents.
:param path: Download path.
"""
# since API v2.8.4
data = ParamDict.with_hashes_or_all(id, key="id")
data.required_path("path", path)
await self._request_text(
"POST",
"torrents/setDownloadPath",
data=data,
)
[docs]
async def rename(self, hash: InfoHash, name: str) -> None:
"""Rename a torrent."""
data = ParamDict.with_hash(hash)
data.required_str("name", name)
await self._request_text(
"POST",
"torrents/rename",
data=data,
)
[docs]
async def set_category(self, hashes: InfoHashesOrAll, category: str) -> None:
"""
Change torrents' category.
:param hashes: A list of info hashes or ``all`` for all torrents.
:param category: Category name. An empty string indicates no category.
"""
params = ParamDict.with_hashes_or_all(hashes)
params.required_str("category", category)
await self._request_text(
"POST",
"torrents/setCategory",
data=params,
)
# since API v2.1.1
[docs]
async def categories(self) -> Dict[str, Category]:
"""
Get categories.
A dict mapping category name to :class:`.Category` is returned.
"""
return await self._request_mapped_dict(
Category,
"GET",
"torrents/categories",
)
[docs]
async def create_category(self, category: str, save_path: StrPath) -> None:
"""Create category."""
data = ParamDict()
data.required_str("category", category)
data.required_path("savePath", save_path)
await self._request_text(
"POST",
"torrents/createCategory",
data=data,
)
[docs]
async def edit_category(self, category: str, save_path: StrPath) -> None:
"""Edit category."""
# since API v2.1.0
# empty save_path ("") is default save path
data = ParamDict()
data.required_str("category", category)
data.required_path("savePath", save_path)
await self._request_text(
"POST",
"torrents/editCategory",
data=data,
)
[docs]
async def remove_categories(self, categories: Iterable[str]) -> None:
"""Remove category."""
_check_iterable_except_str("categories", categories)
data = ParamDict()
data.required_list("categories", categories, "\n")
await self._request_text(
"POST",
"torrents/removeCategories",
data=data,
)
[docs]
async def set_auto_management(self, hashes: InfoHashesOrAll, enable: bool) -> None:
data = ParamDict.with_hashes_or_all(hashes)
data.optional_bool("enable", enable)
await self._request_text(
"POST",
"torrents/setAutoManagement",
data=data,
)
[docs]
async def toggle_sequential_download(self, hashes: InfoHashesOrAll) -> None:
"""Flip ``seq_dl`` values for torrents."""
data = ParamDict.with_hashes_or_all(hashes)
await self._request_text(
"POST",
"torrents/toggleSequentialDownload",
data=data,
)
[docs]
@virtual
async def set_sequential_download(self, hashes: InfoHashesOrAll, value: bool) -> None:
"""
Change ``seq_dl`` for torrents.
.. note::
This method is implemented by querying torrent ``seq_dl`` values, and
:meth:`toggling <.TorrentsAPI.toggle_sequential_download>` them if needed.
"""
torrents = await self.info(hashes=hashes)
targets = []
for info in torrents:
if info.seq_dl != value:
targets.append(info.hash)
if targets:
await self.toggle_sequential_download(targets)
[docs]
async def toggle_first_last_piece_prio(self, hashes: InfoHashesOrAll) -> None:
"""Flip ``f_l_piece_prio`` values for torrents."""
await self._request_text(
"POST",
"torrents/toggleFirstLastPiecePrio",
data=ParamDict.with_hashes_or_all(hashes),
)
[docs]
@virtual
async def set_first_last_piece_prio(self, hashes: InfoHashesOrAll, value: bool) -> None:
"""
Change ``f_l_piece_prio`` for torrents.
.. note::
This method is implemented by querying torrent ``f_l_piece_prio`` values, and
:meth:`toggling <.TorrentsAPI.toggle_first_last_piece_prio>` them if needed.
"""
torrents = await self.info(hashes=hashes)
targets = []
for info in torrents:
if info.f_l_piece_prio != value:
targets.append(info.hash)
if targets:
await self.toggle_first_last_piece_prio(targets)
[docs]
async def set_force_start(self, hashes: InfoHashesOrAll, force: bool) -> None:
"""Set ``force_start`` flags for torrents."""
data = ParamDict.with_hashes_or_all(hashes)
data.required_bool("value", force)
await self._request_text(
"POST",
"torrents/setForceStart",
data=data,
)
[docs]
async def set_super_seeding(self, hashes: InfoHashesOrAll, value: bool) -> None:
"""Set ``super_seeding`` flags for torrents."""
data = ParamDict.with_hashes_or_all(hashes)
# default value (invalid value) is treated as false
data.required_bool("value", value)
await self._request_text(
"POST",
"torrents/setSuperSeeding",
data=data,
)
@overload
async def rename_file(self, hash: InfoHash, id: int, name: str) -> None:
"""available until client 4.3.3"""
@overload
async def rename_file(self, hash: InfoHash, old_path: str, new_path: str) -> None:
"""available since client 4.3.3 or API 2.7.0"""
[docs]
async def rename_file(self, hash: InfoHash, *args: Any, **kwargs: Any) -> None:
"""
Rename a file in torrent.
On qBittorrent v4.3.3 or later, the signature is ``rename_file(hash, old_path, new_path)``.
Below qBittorrent v4.3.3, use ``rename_file(hash, id, name)``, where ``id`` is
the file index from :meth:`~.TorrentsAPI.files`.
Available since qBittorrent v4.2.1 (API 2.4.0).
Signature changed in v4.3.3 (API 2.7.0).
See also: https://github.com/qbittorrent/qBittorrent/pull/13995
"""
# API 2.4.0
client = self._client()
legacy = APIVersion.compare(client.api_version, (2, 7, 0)) < 0
nargs = len(args)
if nargs > 2:
raise TypeError("Too many arguments")
if nargs == 2:
pass
elif nargs == 1:
# Missing one argument
key = "name" if isinstance(args[0], int) else "new_path"
try:
args += (kwargs.pop(key),)
except KeyError:
raise TypeError(f"Missing argument {key!r}") from None
else:
try:
if "old_path" in kwargs:
args = (kwargs.pop("old_path"), kwargs.pop("new_path"))
elif "id" in kwargs:
args = (kwargs.pop("id"), kwargs.pop("name"))
else:
raise KeyError("id" if legacy else "old_path")
except KeyError as ex:
raise TypeError(f"Missing argument {ex.args[0]!r}") from None
if kwargs:
raise TypeError(f"Extra keyword arguments: {kwargs.keys()!r}")
arg, arg2 = args
data = ParamDict.with_hash(hash)
if isinstance(arg, str):
data.required_str("oldPath", arg)
data.required_str("newPath", arg2)
elif isinstance(arg, int):
data.required_int("id", arg)
data.required_str("name", arg2)
else:
raise TypeError(f"Bad call signature: ({type(hash)!r}, {type(arg)!r}, {type(arg2)!r})")
try:
await self._request_text(
"POST",
"torrents/renameFile",
data=data,
)
except exc.BadRequestError as ex:
if APIVersion.compare(self._client().api_version, (2, 4, 0)) >= 0:
note = (
"From qBittorrent 4.2.1, rename_file(hash, id, name) was changed"
" to rename_file(hash, old_path, new_path)."
" BadRequestError may raise if rename_file() was called"
" with inappropriate arguments."
)
exc._add_note(ex, note, logger=self._client()._logger)
raise
[docs]
@since((2, 8, 0))
async def rename_folder(self, hash: InfoHash, old_path: str, new_path: str) -> None:
"""Rename a folder."""
# API 2.8.0
data = ParamDict.with_hash(hash)
data.required_path("oldPath", old_path)
data.required_path("newPath", new_path)
await self._request_text(
"POST",
"torrents/renameFolder",
data=data,
)
[docs]
async def export(self, hash: InfoHash) -> bytes:
"""Export a torrent as ``bytes``."""
# since API v2.8.11
params = ParamDict.with_hash(hash)
resp = await self._client().request(
"GET",
"torrents/export",
params=params,
)
async with resp:
return await resp.read()
[docs]
@since((2, 10, 4))
async def ssl_parameters(self, hash: InfoHash) -> TorrentSSLParameters:
"""
Get SSL parameters
"""
params = ParamDict.with_hash(hash)
return await self._request_mapped_object(
TorrentSSLParameters,
"GET",
"torrents/SSLParameters",
params=params,
)
[docs]
@since((2, 10, 4))
async def set_ssl_parameters(
self,
hash: InfoHash,
ssl_certificate: str,
ssl_private_key: str,
ssl_dh_params: str,
) -> None:
"""
Set SSL parameters
"""
data = ParamDict.with_hash(hash)
data.required_str("ssl_certificate", ssl_certificate)
data.required_str("ssl_private_key", ssl_private_key)
data.required_str("ssl_dh_params", ssl_dh_params)
await self._request_text(
"POST",
"torrents/setSSLParameters",
data=data,
)
def _convert_duration(
delta: Union[timedelta, float],
unit: TimeUnit,
) -> float:
"""
Convert duration to float
timedelta is converted to the unit or seconds if not specified.
int/float is returned unchanged.
"""
if isinstance(delta, timedelta):
delta = unit.from_seconds(delta.total_seconds())
if TYPE_CHECKING:
assert isinstance(delta, (int, float))
return delta
def _convert_path(path: StrPath) -> str:
"""Convert path-like objects to POSIX path str"""
return os.fsdecode(path).replace("\\", "/")