Source code for slash.plugins.plugin_manager

import collections
import gossip
import itertools
import os
import re
import sys
from .interface import PluginInterface
from .registration_info import RegistrationInfo
from ..utils.marks import try_get_mark
from ..utils import parallel_utils
from ..utils.python import reraise
from ..conf import config
from contextlib import contextmanager
from emport import import_file
from sentinels import NOTHING
from vintage import warn_deprecation
import logbook

_logger = logbook.Logger(__name__)

_SKIPPED_PLUGIN_METHOD_NAMES = set(dir(PluginInterface))
PluginInfo = collections.namedtuple("PluginInfo", ("plugin_instance", "is_internal"))
_DEPRECATED_CHARACTERS = '-_'

class IncompatiblePlugin(ValueError):
    pass

class UnknownPlugin(ValueError):
    pass

class IllegalPluginName(ValueError):
    pass

[docs]class PluginManager(object): def __init__(self): super(PluginManager, self).__init__() self._installed = {} self._cmd_line_to_name = {} self._config_to_name = {} self._pending_activation = set() self._pending_deactivation = set() self._active = set() self.install_builtin_plugins() @contextmanager def restoring_state_context(self): previous_installed = self._installed.copy() previous_active = {name: self._installed[name] for name in self._active} try: yield finally: for name in set(previous_installed) - set(self._installed): self.install(previous_installed[name].plugin_instance) for name in set(previous_active) - self._active: self.activate(name) for name in self._active - set(previous_active): self.deactivate(name) for name in set(self._installed) - set(previous_installed): self.uninstall(name)
[docs] def discover(self): """ Iterates over all search paths and loads plugins """ for search_path in config.root.plugins.search_paths: for path, _, filenames in os.walk(search_path): for filename in filenames: if not filename.endswith(".py"): continue module = import_file(os.path.join(path, filename)) install_func = getattr(module, "install_plugins", None) if install_func is None: continue install_func()
[docs] def get_installed_plugins(self, include_internals=True): """ Returns a dict mapping plugin names to currently installed plugins """ return {plugin_name: plugin_info.plugin_instance for plugin_name, plugin_info in self._installed.items() if include_internals or (not plugin_info.is_internal)}
[docs] def get_active_plugins(self): """ Returns a dict mapping plugin names to currently active plugins """ return dict(self._iterate_active_plugins())
def _iterate_active_plugins(self): for active_name in self._active: yield (active_name, self._get_installed_plugin_instance_by_name(active_name))
[docs] def get_future_active_plugins(self): """ Returns a dictionary of plugins intended to be active once the 'pending activation' mechanism is finished """ returned = self.get_active_plugins() for name in self._pending_activation: returned[name] = self.get_plugin(name) for name in self._pending_deactivation: returned.pop(name, None) return returned
[docs] def get_plugin(self, plugin_name): """ Retrieves a registered plugin by name, or raises a LookupError """ return self._installed[plugin_name].plugin_instance
[docs] def is_internal_plugin(self, plugin): """ Returns rather installed plugin is internal plugin """ plugin_name = plugin if isinstance(plugin, str) else plugin.get_name() return self._installed[plugin_name].is_internal
def _is_parallel_supported(self, plugin): if not parallel_utils.is_parallel_session(): return False plugin_parallel_mode = try_get_mark(plugin, 'parallel_mode', parallel_utils.ParallelPluginModes.ENABLED) if plugin_parallel_mode == parallel_utils.ParallelPluginModes.ENABLED: return False if (plugin_parallel_mode == parallel_utils.ParallelPluginModes.DISABLED) \ or (plugin_parallel_mode == parallel_utils.ParallelPluginModes.PARENT_ONLY and parallel_utils.is_child_session()) \ or (plugin_parallel_mode == parallel_utils.ParallelPluginModes.CHILD_ONLY and parallel_utils.is_parent_session()): return True return False def configure_for_parallel_mode(self): for plugin in self.get_installed_plugins().values(): if self._is_parallel_supported(plugin): self.deactivate_later(plugin)
[docs] def install(self, plugin, activate=False, activate_later=False, is_internal=False): """ Installs a plugin object to the plugin mechanism. ``plugin`` must be an object deriving from :class:`slash.plugins.PluginInterface`. """ if not isinstance(plugin, PluginInterface): raise IncompatiblePlugin("Invalid plugin type: {!r}".format(type(plugin))) plugin_name = plugin.get_name() if re.search(r'[^A-Za-z0-9_ -]', plugin_name): raise IllegalPluginName("Illegal plugin name: {}".format(plugin_name)) if any(char in plugin_name for char in _DEPRECATED_CHARACTERS): warn_deprecation("In the future, dashes and underscore will not be allowed in plugin names - " "spaces should be used instead (plugin name: {!r})".format(plugin_name)) self._configure(plugin) self._installed[plugin_name] = PluginInfo(plugin, is_internal) self._cmd_line_to_name[self.normalize_command_line_name(plugin_name)] = plugin_name self._config_to_name[self.normalize_config_name(plugin_name)] = plugin_name if not hasattr(plugin, '__toggles__'): plugin.__toggles__ = { 'session': gossip.Toggle(), } if activate: try: self.activate(plugin_name) except IncompatiblePlugin: exc_info = sys.exc_info() self.uninstall(plugin) reraise(*exc_info) if activate_later: self.activate_later(plugin_name)
def install_builtin_plugins(self): for builtin_plugin_module in self._iter_builtin_plugin_modules(): module = __import__( "slash.plugins.builtin.{}".format(builtin_plugin_module), fromlist=[""] ) self.install(module.Plugin()) def _iter_builtin_plugin_modules(self): builtin_dir = os.path.join(os.path.dirname(__file__), "builtin") for filename in os.listdir(builtin_dir): if filename.startswith("_") or filename.startswith(".") or not filename.endswith(".py"): continue yield filename[:-3]
[docs] def uninstall(self, plugin): """ Uninstalls a plugin """ plugin = self._get_installed_plugin(plugin) try: self.deactivate(plugin) except IncompatiblePlugin: pass self._unconfigure(plugin) plugin_name = plugin.get_name() self._installed.pop(plugin_name) cmd_name = self.normalize_command_line_name(plugin_name) self._cmd_line_to_name.pop(cmd_name, None) config_name = self.normalize_config_name(plugin_name) self._config_to_name.pop(config_name, None)
[docs] def uninstall_all(self): """ Uninstalls all installed plugins """ for plugin_info in list(self._installed.values()): self.uninstall(plugin_info.plugin_instance) assert not self._installed
[docs] def activate(self, plugin): """ Activates a plugin, registering its hook callbacks to their respective hooks. :param plugin: either a plugin object or a plugin name """ plugin = self._get_installed_plugin(plugin) plugin_name = plugin.get_name() if self._is_parallel_supported(plugin): _logger.warn("Activating plugin {} though it's configuration for parallel mode doesn't fit to current session".format(plugin.get_name())) plugin.activate() for hook, callback, kwargs in self._get_plugin_registrations(plugin): hook.register(callback, **kwargs) self._active.add(plugin_name)
[docs] def activate_later(self, plugin): """ Adds a plugin to the set of plugins pending activation. It can be remvoed from the queue with :meth:`.deactivate_later` .. seealso:: :meth:`.activate_pending_plugins` """ self._pending_activation.add(self._get_installed_plugin(plugin).get_name())
[docs] def deactivate_later(self, plugin): """ Removes a plugin from the set of plugins pending activation. .. seealso:: :meth:`.activate_pending_plugins` """ self._pending_deactivation.add(self._get_installed_plugin(plugin).get_name())
[docs] def activate_pending_plugins(self): """ Activates all plugins queued with :meth:`.activate_later` """ while self._pending_activation: plugin_name = self._pending_activation.pop() if plugin_name not in self._pending_deactivation: self.activate(plugin_name) while self._pending_deactivation: plugin_name = self._pending_deactivation.pop() if plugin_name in self._active: self.deactivate(plugin_name)
def normalize_command_line_name(self, plugin_name): return plugin_name.replace(' ', '-') def normalize_config_name(self, plugin_name): return plugin_name.replace(' ', '_')
[docs] def deactivate(self, plugin): """ Deactivates a plugin, unregistering all of its hook callbacks :param plugin: either a plugin object or a plugin name """ plugin = self._get_installed_plugin(plugin) plugin_name = plugin.get_name() token = self._get_token(plugin_name) if plugin_name in self._active: gossip.get_global_group().unregister_token(token) self._active.discard(plugin_name) plugin.deactivate()
def _configure(self, plugin): cfg = plugin.get_config() if cfg is not None: warn_deprecation('PluginInterface.get_config() is deprecated. ' 'Please use PluginInterface.get_default_config() instead') else: cfg = plugin.get_default_config() if cfg is not None: plugin_name = plugin.get_name() config_name = self.normalize_config_name(plugin_name) config['plugin_config'].extend({config_name: cfg}) def _unconfigure(self, plugin): plugin_config = config['plugin_config'] config_name = self.normalize_config_name(plugin.get_name()) if config_name in plugin_config: plugin_config.pop(config_name) def _get_token(self, plugin_name): return "slash.plugins.{}".format(plugin_name) def _get_installed_plugin_instance_by_name(self, plugin_name): plugin_info = self._installed.get(plugin_name) if plugin_info is None: return None return plugin_info.plugin_instance def _get_installed_plugin_instance_by_type(self, plugin_class): for plugin in self._installed.values(): if type(plugin.plugin_instance) is plugin_class: # pylint: disable=unidiomatic-typecheck return plugin.plugin_instance return None def _get_installed_plugin(self, plugin): if isinstance(plugin, str): plugin_name = plugin if plugin_name in self._cmd_line_to_name: plugin_name = self._cmd_line_to_name[plugin_name] plugin_instance = self._get_installed_plugin_instance_by_name(plugin_name) elif isinstance(plugin, type): plugin_instance = self._get_installed_plugin_instance_by_type(plugin) plugin_name = plugin_instance.get_name() if plugin_instance is not None else repr(plugin) else: plugin_instance = plugin plugin_name = plugin.get_name() if plugin_instance is None or self._get_installed_plugin_instance_by_name(plugin_name) is not plugin_instance: raise UnknownPlugin("Unknown plugin: {}".format(plugin_name)) return plugin_instance def _get_plugin_registrations(self, plugin): plugin_name = plugin.get_name() returned = [] unknown = [] global_needs = try_get_mark(plugin, 'plugin_needs', []) global_provides = try_get_mark(plugin, 'plugin_provides', []) has_session_end = has_session_start = False register_no_op_hooks = set() if global_provides: register_no_op_hooks.update(hook.full_name for hook in gossip.get_group('slash').get_hooks()) for method_name in dir(type(plugin)): if method_name in _SKIPPED_PLUGIN_METHOD_NAMES: continue method = getattr(plugin, method_name) if not hasattr(method, '__call__'): continue registration_list = try_get_mark(method, 'register_on', NOTHING) if registration_list is not NOTHING: registration_list = registration_list[:] else: if method_name.startswith('_'): continue registration_list = [RegistrationInfo("slash.{}".format(method_name), expect_exists=True)] for registration_info in registration_list: if registration_info.hook_name is None: # asked not to register for nothing continue if not try_get_mark(method, 'register_if', True): continue plugin_needs = list( itertools.chain( try_get_mark(method, 'plugin_needs', []), global_needs, registration_info.register_kwargs.get('needs', []))) plugin_provides = list( itertools.chain( try_get_mark(method, 'plugin_provides', []), global_provides, registration_info.register_kwargs.get('provides', []))) try: if registration_info.expect_exists: hook = gossip.get_hook(registration_info.hook_name) else: hook = gossip.hooks.get_or_create_hook(registration_info.hook_name) if not hook.is_defined() and hook.group.is_strict(): raise LookupError() except LookupError: unknown.append(registration_info.hook_name) continue assert hook is not None register_no_op_hooks.discard(registration_info.hook_name) kwargs = registration_info.register_kwargs.copy() kwargs.update({ 'needs': plugin_needs, 'provides': plugin_provides, 'token': self._get_token(plugin_name), }) if registration_info.hook_name == 'slash.session_start': has_session_start = True kwargs['toggles_on'] = plugin.__toggles__['session'] elif registration_info.hook_name == 'slash.session_end': has_session_end = True kwargs['toggles_off'] = plugin.__toggles__['session'] returned.append((hook, method, kwargs)) if has_session_end and not has_session_start: hook = gossip.get_hook('slash.session_start') returned.append((hook, lambda: None, {'toggles_on': plugin.__toggles__['session']})) register_no_op_hooks.discard(hook.full_name) for hook_name in register_no_op_hooks: hook = gossip.get_hook(hook_name) hook.register_no_op(provides=global_provides, token=self._get_token(plugin_name)) if unknown: raise IncompatiblePlugin("Unknown hooks: {}".format(", ".join(unknown))) return returned
manager = PluginManager()