Source code for vkpore.vkpore

"""Module with core class for organizing event flow."""

from typing import List, Dict, Awaitable, Optional, Iterable, Tuple
from random import choice
from asyncio import AbstractEventLoop as AEL
import asyncio
import logging

from aiohttp import ClientSession

from .vkclient import VkClient
from .events import Event, Callback, MessageNew, EventRaw
from .utils import wait_with_stopped


[docs]class Vkpore(): """ Class for receiving events, calling methods, callback registration and execution. You can specify loop and session to use. """ def __init__(self, tokens: Iterable[str], loop: AEL = None, session: ClientSession = None): self._loop: AEL = loop or asyncio.get_event_loop() self._callbacks: Dict[str, List[Callback]] = {} self._futures: List[asyncio.Future] = [] self._tokens: Tuple[str, ...] = tuple(tokens) self._clients: Dict[int, List[VkClient]] = {} self._stopped: asyncio.Event = asyncio.Event() self._stopped.set() self._session: Optional[ClientSession] = session
[docs] def get_client(self, group_id): """Return random client for specified group_id.""" clients = self._clients.get(group_id) if not clients: return None return choice(clients)
@staticmethod def _get_event_class(update_type): # pragma: no cover if update_type == "message_new": return MessageNew return EventRaw async def _longpoll_loop(self, group_id): get_updates = self.get_client(group_id).longpoll() stopped = asyncio.Task(self._stopped.wait()) while True: updates = await wait_with_stopped(get_updates(), stopped) if updates is None: break for update in updates: event_class = self._get_event_class(update["type"]) self.dispatch(event_class(group_id, update["object"]))
[docs] async def start(self): """Start application related loops and perform initializations.""" if self._session is None: self._session = ClientSession() # Create and inititalize clients for token in self._tokens: client = VkClient(token, self._session, self._loop) await client.initialize() if client.group_id not in self._clients: self._clients[client.group_id] = [] self._clients[client.group_id].append(client) # Application not is considered running self._stopped.clear() # Create and start loops for receiving updates for groups for group_id in self._clients: self._futures.append( asyncio.ensure_future( self._longpoll_loop(group_id), loop=self._loop ) ) # Start execute loops for clients for clients in self._clients.values(): for client in clients: client.start() logging.info("Started")
[docs] async def stop(self): """ Stop client loops, wait for all tasks to complete and close sessions. """ logging.info("Stopping") self._stopped.set() # Wait for running callbacks to stop await asyncio.gather(*self._futures) # Wait for running loops to stop tasks = [] for group in self._clients.values(): for client in group: tasks.append(client.stop()) await asyncio.gather(*tasks) # Close session if self._session: await self._session.close() logging.info("Stopped")
[docs] def on(self, event: str): """Return decorator for subscribing callback to event.""" def decorator(function: Callback): if event not in self._callbacks: self._callbacks[event] = [] self._callbacks[event].append(function) return decorator
[docs] def dispatch(self, event: Event): """Call callbacks for event.""" callbacks = self._callbacks.get(event.name) if not callbacks or self._stopped.is_set(): return None event.initialize(self, callbacks) future = asyncio.ensure_future(event.next(), loop=self._loop) self._futures.append(future) logging.debug("Dispatched event: %s", event) return future
[docs] def run_until_complete(self, awaitable: Awaitable): # pragma: no cover """Run specified awaitable in application's loop.""" self._loop.run_until_complete(awaitable)
[docs] def run(self): # pragma: no cover """Run application and stop on KeyboardInterrupt.""" try: self._loop.run_until_complete(self.start()) self._loop.run_forever() except KeyboardInterrupt: self.run_until_complete(self.stop())