WIP-0005: Task Queues and State Synchronization¶
Introduction¶
This document proposes a way to manage task queues and state synchronization in Whitebox. The goal is to ensure that tasks are processed reliably, and that the state of the system is consistent across all internals.
Problem¶
Whitebox's plugin system was initially designed based on global state mutation, which, when we transitioned to Gunicorn (which uses multiple workers), turned out to be suboptimal. As the global state is not shared between workers, any state changes made by one worker are not visible to others, primarily dynamic plugin loading/unloading.
For this reason, we need to refactor certain parts of the codebase in a way that allows Whitebox to maintain a consistent state across all workers, and background tasks to not cause any conflicts.
Some examples:
- When a plugin is (un)loaded, it should be reflected across all workers
- Federation building should not be redundantly triggered by multiple workers
Task Queues¶
For task queues, we'll use RQ (Redis Queue) in combination with Redis. RQ is much more lightwight than Celery and should be sufficient for our needs.
Tasks need to be able to run in the background and be retried if they fail. RQ handles this out of the box. They be defined both within kernel and plugins, and can be scheduled to run immediately or at a later time.
State Synchronization¶
To ensure that the state is consistent across all workers, we use multiple combined strategies:
- Distributed Locking: We will use Redis to implement distributed locks to ensure that only one worker can perform certain operations at a time.
- Event Broadcasting: We will use Redis Pub/Sub to broadcast events to all workers when certain state changes occur, such as plugin loading/unloading.
- Periodic State Sync: Workers will periodically sync their state with the central Redis store to ensure that they have the latest state.
Practical application¶
For example, when a plugin is loaded or unloaded, the following steps will occur:
- Before starting to load or unload the plugin, the worker will acquire a distributed lock to ensure that no other worker is performing the same operation
- The worker will then load or unload the plugin
- If the plugin has federation code, the worker will trigger a background task to build the plugin's federation
- Once the plugin is loaded or unloaded, the worker will publish an event to Redis Pub/Sub to notify all other workers about the event, which will then perform load/unload the plugin on their end as well, without triggering a federation build on their own
- In case of a redis reconnection, the workers will resync their state with the central Redis store to ensure that they have the latest state
Example¶
As an example in which direction the implementation should go, here's an example
of how the StateManager
's implementation could look like:
sentinel = object() # Unique sentinel object for missing values
class StateManager:
# Ensures that pub/sub doesn't cause any threading issues, and that
# newest values are available as soon as they happen
_local_lock: threading.Lock
def _pub_sub_callback(self, key, value):
with self._local_lock:
# update value by callback
pass
def get(self, key):
with self._local_lock:
store = self._state.get(key, sentinel)
if store is sentinel:
# handle
return
threshold = time.time() - 300
if store['timestamp'] < threshold:
with self.lock(key):
store = self._fetch_from_redis(key)
return store['value']
state_manager = StateManager()
The StateManager
would be a singleton that manages the state of the system,
and it could then be used like this:
from state_management import state_manager # a singleton
# Simple pull from a state store
def print_loaded_plugins():
loaded = state_manager.get("LOADED_PLUGINS")
print("Loaded plugins:")
for plugin in loaded:
print(f"- {plugin}")
# Mutation of the state store, where we want to lock a specific entry
def load_plugin(plugin_name):
# Acquire a distributed lock for writing. Other workers will be able to read
# an existing value, but not write to it until the lock is released.
#
# In cases where operations need to be "atomic" like in this case, all
# operations that mutate the state should be wrapped in a lock.
with state_manager.lock_for_write("LOADED_PLUGINS"):
# Check if a plugin is already loaded
loaded = state_manager.get("LOADED_PLUGINS")
if plugin_name in loaded:
print(f"Plugin {plugin_name} is already loaded.")
return
_actually_load_plugin(plugin_name) # Some magic happens here
loaded.append(plugin_name)
# After the plugin is loaded, we update the state. This method
# will automatically publish the change to all other workers through
# Redis Pub/Sub, and since it is wrapped in a lock, race conditions
# are prevented by design
state_manager.set("LOADED_PLUGINS", loaded)