Source code for yumemi.anidb

import socket
import threading
import time
import typing as t
import zlib

import attrs
from cryptography.hazmat.primitives import ciphers, hashes, padding

from .exceptions import ClientError, ServerError

[docs]@attrs.define class Connection: """ Low-level conection to the AniDB UDP API with thread safe `flood protection <>`_ (packet rate limit, one packet every two seconds). """ server_host: str = '' server_port: int = 9000 local_port: int = 8888 _lock: threading.RLock = attrs.field(init=False) _socket: socket.socket = attrs.field(init=False) _send_time: float = attrs.field(init=False) _send_count: int = attrs.field(init=False) _send_drop_count: int = attrs.field(init=False) def __attrs_post_init__(self): self._lock = threading.RLock() self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._socket.bind(('', self.local_port)) self._socket.settimeout(4) self._send_time = 0 self._send_count = 0 self._send_drop_count = 0
[docs] def send(self, data: bytes) -> None: if len(data) > 1400: raise ClientError("Can't send more than 1400 bytes") with self._lock: delay_secs = 0 if self._send_count > 4: # "Short Term" policy (1 packet per 2 seconds). # Enforced after the first 5 packets. delay_secs = 2 if self._send_drop_count > 4: # "Long Term" policy (1 packet per 4 seconds). # Used when server starts dropping packets. delay_secs = 4 t = time.time() if t < self._send_time + delay_secs: time.sleep(self._send_time + delay_secs - t) try: self._socket.sendto(data, (self.server_host, self.server_port)) finally: self._send_count += 1 self._send_time = time.time()
[docs] def recv(self) -> bytes: data = b'' try: # Replies from the server will never exceed 1400 bytes. data = self._socket.recv(1400) except socket.timeout: with self._lock: self._send_drop_count += 1 else: with self._lock: if self._send_drop_count > 0: self._send_drop_count -= 1 if not data: raise ServerError('Received no data from the API') return data
[docs]@attrs.define class CodecPlain: encoding: str
[docs] def encode(self, data: str) -> bytes: return data.encode(self.encoding)
[docs] def decode(self, data: bytes) -> str: if data.startswith(b'\0\0'): data = zlib.decompressobj().decompress(data[2:]) return data.decode(self.encoding)
[docs]@attrs.define class CodecCrypt(CodecPlain): encrypt_key: str = attrs.field(repr=False) _cipher: ciphers.Cipher = attrs.field(init=False) _padding: padding.PKCS7 = attrs.field(init=False) def __attrs_post_init__(self): digest = hashes.Hash(hashes.MD5()) digest.update(self.encrypt_key.encode(self.encoding)) key_hash = digest.finalize() self._cipher = ciphers.Cipher( ciphers.algorithms.AES128(key_hash), ciphers.modes.ECB(), ) self._padding = padding.PKCS7(128)
[docs] def encode(self, data: str) -> bytes: encoded_data = super().encode(data) padder = self._padding.padder() padded_data = padder.update(encoded_data) padded_data += padder.finalize() encryptor = self._cipher.encryptor() encrypted_data = encryptor.update(padded_data) encrypted_data += encryptor.finalize() return encrypted_data
[docs] def decode(self, data: bytes) -> str: decryptor = self._cipher.decryptor() decrypted_data = decryptor.update(data) decrypted_data += decryptor.finalize() unpadder = self._padding.unpadder() unpadded_data = unpadder.update(decrypted_data) unpadded_data += unpadder.finalize() decoded_data = super().decode(unpadded_data) return decoded_data
[docs]@attrs.define class Result: command: str params: dict[str, t.Any] code: int message: str data: tuple[tuple[str, ...], ...]
[docs]@attrs.define class Client: client_name: str client_version: int _connection: Connection = attrs.field(init=False) _lock: threading.RLock = attrs.field(init=False) _codec: CodecPlain = attrs.field(init=False) _session_key: t.Optional[str] = attrs.field(init=False) def __attrs_post_init__(self): self._connection = Connection() self._lock = threading.RLock() self._codec = CodecPlain('ASCII') self._session_key = None
[docs] def command(self, command: str, params: t.Optional[dict[str, t.Any]] = None, ) -> Result: """ Sends a command to the API, wait for a response, and return the command result. Exceptions are raised only for common client and server side errors. You still need to check the result code to see if the command succeeded or not. Commands documentation is on `AniDB Wiki`_. .. _AniDB Wiki: Args: command: Command name. params: Command parameters. Returns: Command result. Raises: ClientError: Raised for common client side errors, like invalid command or invalid parameters. ServerError: When something went wrong on the server side. """ command = command.upper() params = params or {} params_copy = params.copy() for k, v in params_copy.items(): if v is None: v = '' elif isinstance(v, bool): v = int(v) params_copy[k] = str(v).replace('&', '&amp;').replace('\n', '<br />') with self._lock: if command not in {'PING', 'ENCODING', 'ENCRYPT', 'AUTH', 'VERSION'}: if not self._session_key: result = Result( command=command, params=params, code=501, message='LOGIN FIRST', data=tuple(), ) raise ClientError.from_result(result) params_copy['s'] = self._session_key params_str = '&'.join(f'{k}={v}' for k, v in params_copy.items()) request = self._codec.encode(f'{command} {params_str}'.strip()) self._connection.send(request) response = self._connection.recv() lines = self._codec.decode(response).split('\n') code, message = lines[0].split(' ', maxsplit=1) data = tuple( tuple(field.replace('<br />', '\n') for field in line.split('|')) for line in lines[1:] ) result = Result( command=command, params=params, code=int(code), message=message, data=data, ) if result.code >= 600: raise ServerError.from_result(result) elif result.code >= 500: raise ClientError.from_result(result) return result
[docs] def ping(self) -> bool: """ Check if API is available. Does not require active session, so it may be called even if a user not logged in. Returns: ``True`` if API is available, ``False`` otherwise. See also: :meth:`command` """ try: return self.command('PING').code == 300 except Exception: return False
[docs] def encrypt(self, username: str, api_key: str) -> None: """ Start encrypted session. A normal authentication is still necessary and should follow the `encrypt` call. Args: username: User name. api_key: API key, defined in profile settings. Raises: ClientError: Raised when encrypted session could not be established. See also: :meth:`command` """ with self._lock: result = self.command('ENCRYPT', { 'user': username, 'type': 1, }) if result.code != 209: raise ClientError.from_result(result) key = api_key + result.message.split()[0] self._codec = CodecCrypt(self._codec.encoding, key)
[docs] def auth(self, username: str, password: str) -> Result: """ Authenticate to AniDB. Args: username: User name. password: User password. Returns: AUTH command result with session key removed from the result message. Raises: ClientError: Raised when authentication failed. See also: :meth:`command` """ with self._lock: result = self.command('AUTH', { 'user': username, 'pass': password, 'protover': 3, 'client': self.client_name, 'clientver': self.client_version, 'enc': 'UTF-8', 'comp': True, }) if result.code not in {200, 201}: raise ClientError.from_result(result) self._codec = CodecPlain('UTF-8') self._session_key, message = result.message.split(maxsplit=1) result.message = message return result
[docs] def logout(self) -> None: """ Logout from AniDB. See also: :meth:`command` """ with self._lock: result = self.command('LOGOUT') if result.code == 203: self._codec = CodecPlain('ASCII') self._session_key = None
[docs] def check_session(self) -> bool: """ Check if a user is logged in and the session is still active on the server. This method can be called once every 30 minutes to keep the connection alive. See also: :meth:`command` """ with self._lock: return self._session_key is not None and self.command('UPTIME').code == 208