WIP-0008: Logging & Event Processing¶
Introduction¶
Whitebox, as a blackbox, observes, stores & processes data from a lot of events taking place all the time. This document proposes an implementation of an event system that allows convenient event storing and processing across all units of Whitebox's architecture.
We can separate the events into several categories:
-
Commands: Events that represent user- or system-issued commands, which trigger certain actions within the system, with the intention to change some system's state. For example:
- Start flight
- Start recording a key moment
- Load a plugin
-
Object Updates: Events that represent changes in the state of objects within the system, with the intention to know and store what changed, from what, and to what. They contain a diff of the changed objects. For example:
- Flight session created/updated/deleted
- Key moment created/updated/deleted
- Device configuration updated
- Location entry created
-
Observations: Events that represent changes in the status of the system or its components, with the intention to know and store what happened, and when. They may contain event-specific structured data. For example:
- WebSocket client connected/disconnected
- Flight started
- Key moment recording started
- Plugin loaded
- Device connected/disconnected
- Location updated
-
External Logs: Events that represent log entries generated by external services, that are free-form text messages providing insights into the operation of the client devices. Their intention is to allow easier debugging and post-mortem issue analysis. For example:
- Frontend console logs
- External service logs (e.g., video processing)
Problem¶
The former event architecture did not take into account the auditability requirements of a black box system and was not storing the events, and it also did not scale properly to all edges of the system.
Proposed Implementation¶
The event system allows for any component to produce and consume events. Every event is, at minimum, timestamped and logged to a centralized event storage, after which it can be processed by any interested component.
A consumer can subscribe to specific event types and process them as they arrive. This allows for a decoupled architecture, where producers and consumers do not need to know about each other. It also allows for better scalability, as new consumers, or event types, can be added without affecting existing ones.
Architecture¶
There are five distinct areas of the Whitebox architecture that interact with the event processing system:
-
Frontend: The user interface that allows users to interact with the system, issue commands, and view status updates. It issues command events, consumes status update events, and produces external log events.
-
Kernel: Backend server kernel that communicates with frontend via WebSockets. It takes commands from frontend and executes them in sync, or starts them in background tasks.
-
Kernel Plugin: Backend server's plugin code that handles business logic, It produces and consumes all types of events.
-
Plugin Daemon: Long-running plugin's service process. It produces and consumes status update events, and can issue command events.
-
Background Task: Short-lived background task that performs specific purpose operations. It produces status update events.
Kernel and kernel plugins are part of the same Django application, and the split is only logical, to illustrate the different roles they play.
Channels is used to implement an event transport layer that can scale well, as Redis that's used as a broker can handle a large number of messages. Every area of the system interacts with the event system in its own way:
-
Frontend: Uses WebSockets to connect to kernel and send/receive events in real-time. Kernel pushes only events that frontend has subscribed to, with a unique identifier of that frontend session (page load).
-
Kernel: Allows frontend to connect via WebSockets, and uses Channels to communicate with other backend components. It acts as a bridge between frontend and backend components, and tags all messages coming in from frontend with a unique identifier. Stores all events in the centralized event database.
-
Kernel Plugin: Uses Channels to send events. Receives events through callbacks registered for specific event types.
-
Plugin Daemon: Uses Channels to send events. Receives events through callbacks registered for specific event types, using a Channels Consumer that's running on a separate thread.
-
Background Task: Uses Channels to send events. Does not receive events.
Channels Transport Implementation¶
Channels introduces several concepts that we're using here:
-
Channel name: A named mailbox to which messages can be sent. In our case, every event receiver has its own channel name.
-
Channel group: A group can be subscribed to by channel names, which can then all be sent messages at once. In our case, a group represents an event type, and all channel names that are interested in that event type can subscribe to the group.
-
Consumer: An instance associated with a WebSocket connection.
-
channel_layer: The main Channels interface that allows sending messages to channel names or groups.
When a component wants to receive events of a specific type, it creates a channel name and subscribes it to the corresponding event type group, i.e.:
In Redis, this creates a sorted set (ZSET) for every channel group, containing all channel names that are subscribed to it. Every entry is assigned a score representing the subscription timestamp, making the system automatically drop stale subscriptions after a timeout. Internally, this translates to:
# Group subscription
ZADD asgi:group:status.flight.start 1767606407 "channel_name_1"
ZADD asgi:group:status.flight.start 1767606408 "channel_name_2"
# When looking up all subscriptions for an event type
ZRANGE asgi:group:status.flight.start 0 -1 WITHSCORES
# returns:
1) "channel_name_1"
2) 1767606407
3) "channel_name_2"
4) 1767606408
When an event is produced, i.e.:
channels-redis looks up the subscriptions and "delivers" a message to every
"mailbox" that was subscribed to the event type. It uses a Lua script to do this
atomically, ensuring that no subscriptions are missed. Internally, this roughly
translates to:
# find all channel group subscriptions
ZRANGE asgi:group:status.flight.start 0 -1
# returns:
-> "channel_name_1" 1767606407
-> "channel_name_2" 1767606408
# deliver message to every subscribed channel name
ZADD asgi:channel:channel_name_1 CURRENT_TIMESTAMP "{"hello": "world"}"
ZADD asgi:channel:channel_name_1 CURRENT_TIMESTAMP "{"hello": "world"}"
Channel consumers do not get notified of new messages automatically. Instead, they use a combination of blocking pop (BZPOPMIN) and non-blocking pop (ZPOPMIN) to wait for new messages on their channel name, i.e.:
Internally, this roughly translates to:
# Get the `MESSAGE`
BZPOPMIN asgi:channel:channel_name_1 0
ZADD asgi:channel:channel_name_1!inflight MESSAGE.TIMESTAMP MESSAGE.DATA
# process the message within Python
ZPOPMIN asgi:channel:channel_name_1!inflight
This guarantees at-least-once delivery of every event to every subscription.
Event Storage¶
All events are stored in a centralized event database within a single table. Command and status update events are stored using the activity stream pattern for easier auditability and replayability, while external log events are stored as simple log entries.
Every event has the following fields:
id: Unique identifier for the eventtimestamp: Timestamp when the event was createdsource: Source of the event (e.g., frontend, kernel plugin, plugin daemon)event_type: Type of the event - command, status update, or external logactor: (Optional) Identifier of the actor that produced the eventaction: (Optional) Action performed by the eventobject: (Optional) Object on which the action was performeddata: (Optional) Additional data related to the eventsummary: (Optional) Short summary of the event for easier searching
actor and object fields are generic foreign keys
to any object within the system, allowing for flexible referencing of
different entities.
Below is the proposed way to store different event types.
Command Types¶
Command events are stored using the activity stream pattern, with its fields representing the command's context. It indicates what was the command-issuer's intent when issuing the command.
For example, a user may issue a command to start a flight, and later end the flight. The start command would be stored as:
{
"id": "1234",
"timestamp": "1970-01-01T00:00:00Z",
"source": "frontend:SESSION_UNIQUE_ID",
"event_type": "COMMAND",
"actor": "USER",
"action": "command.flight.start",
"data": {
"takeoff_location": "INI",
"arrival_location": "BEG",
"waypoints": []
}
}
As the flight start is requested, without it actually existing yet, the object field is not used. However, when the flight end command is issued, the object field will be used to reference the flight that was started earlier:
{
"id": "1235",
"timestamp": "1970-01-01T01:00:00Z",
"source": "frontend:SESSION_UNIQUE_ID",
"event_type": "COMMAND",
"actor": "USER",
"action": "command.flight.end",
"object": "FLIGHT_SESSION"
}
Object Update Types¶
Object update events are also stored using the activity stream pattern, with its fields representing the object update's context. It indicates what was changed within the system.
Updates stemming from commands¶
After a command to start a flight is issued and processed, a object update event would be produced, indicating that the flight has started:
{
"id": "1236",
"timestamp": "1970-01-01T00:00:01Z",
"source": "frontend:SESSION_UNIQUE_ID",
"event_type": "OBJECT_UPDATE",
"actor": "USER",
"action": "object.flight.started",
"object": "FLIGHT_SESSION",
"data": {
"new": {
...dict dump of the `FlightSession` object
}
}
}
In this case, source and actor object fields are the same as in the command
event. Even if the event is produced by a different component (e.g., kernel
plugin), the two fields remain the same, as they represent the original source
of the event and allow for easier tracing.
The data field contains the old and new state of the flight session object.
Similarly to the above, when the flight session end is requested, a diff would be
stored in the object update event:
{
...
"object": "FLIGHT_SESSION",
"data": {
"old": {
"ended_at": null
},
"new": {
"ended_at": "1970-01-01T06:00:00Z"
}
}
}
Non-user initiated updates¶
Other object updates that are not directly stemming from user-issued commands can also be stored in the same manner. For example, when a location entry is updated automatically by the system, an object update event would be stored:
{
"id": "1237",
"timestamp": "1970-01-01T02:00:00Z",
"source": "plugin:stratux",
"event_type": "OBJECT_UPDATE",
"actor": "SYSTEM",
"action": "object.location.updated",
"object": "LOCATION_ENTRY",
"data": {
"new": {
"latitude": -80.386480,
"longitude": 20.651491
}
}
}
Observation Types¶
Observation events are also stored using the activity stream pattern, with its fields representing the observation's context. It indicates what happened within the system.
For example, when a WebSocket client connects to the kernel, an observation event would be stored:
{
"id": "1238",
"timestamp": "1970-01-01T03:00:00Z",
"source": "kernel",
"event_type": "OBSERVATION",
"action": "observation.websocket.client_connected",
"data": {
"client_id": "SESSION_UNIQUE_ID",
"ip_address": "10.42.0.42"
}
}
Similarly, when a device is connected to over Wi-Fi, an observation event would be stored:
{
"id": "1239",
"timestamp": "1970-01-01T04:00:00Z",
"source": "plugin:device-insta360",
"event_type": "OBSERVATION",
"action": "observation.device.connected",
"object": "DEVICE_CONNECTION",
"data": {
"connection_type": "WIFI",
"interface": "wlan0"
}
}
External Log Types¶
External log events are stored as simple log entries, with its fields representing the log's context. It contains a free-form text message, and is intended for developers, for easier debugging and post-mortem analysis.
For example, when the frontend application logs a message, an external log event would be stored:
{
"id": "1240",
"timestamp": "1970-01-01T05:00:00Z",
"source": "frontend:SESSION_UNIQUE_ID",
"event_type": "EXTERNAL_LOG",
"data": {
"log_level": "ERROR",
"message": "Failed to load flight data"
}
}
Wiring It All Together¶
Frontend¶
The frontend application uses WebSockets to connect to the kernel, and sends command events as they are issued by the user. It also subscribes to observation events to receive real-time status updates.
When connecting to the WebSocket, the frontend first identifies itself with a unique session ID:
const sessionId = generateUniqueSessionId();
const websocket = new WebSocket(path);
websocket.onopen = () => {
const identifyEvent = {
type: "identify",
data: {
session_id: sessionId
}
};
websocket.send(JSON.stringify(identifyEvent));
};
This allows the kernel to tag all events coming from this frontend session.
Frontend can subscribe to specific event types to receive real-time updates. For example, to subscribe to flight status updates:
const subscribeEvent = {
type: "subscribe",
event_types: ["status.flight.started", "status.flight.ended"]
};
websocket.send(JSON.stringify(subscribeEvent));
When a command is issued, the frontend sends a command event to the kernel:
const commandEvent = {
type: "command.flight.start",
data: {
takeoff_location: "INI",
arrival_location: "BEG",
waypoints: []
}
};
websocket.send(JSON.stringify(commandEvent));
Kernel will process the command and produce the corresponding object update and observation events, which will later be echoed back to the frontend if it has subscribed to them.
When a log event takes place in the frontend, it sends an external log event to the kernel:
const logEvent = {
type: "log",
data: {
log_level: "ERROR",
message: "Failed to load flight data"
}
};
websocket.send(JSON.stringify(logEvent));
Kernel & Kernel Plugins¶
The kernel, in this case, acts as a bridge between the frontend and the event processing core, and allows frontend to connect via WebSockets. It takes events from frontend, tags them with the session ID, and stores them in the event database. It allows kernel plugins to act upon these events.
When frontend sends a log event, it just gets stored for human auditing.
When frontend sends a command event, the kernel executes appropriate event
handlers registered by kernel plugins. For example, when a command.flight.start
event is received, the kernel plugin responsible for flight sessions would
handle it. It uses reversion
to track object changes automatically with the appropriate tagging, producing
the corresponding object update events.
Similarly, observation events produced by kernel plugins are also stored in the event database.
Subscriptions are managed using channel groups. For example:
class Consumer(AsyncWebsocketConsumer):
...
async def receive(self, event):
if event["type"] == "subscribe":
event_types = event["data"]["event_types"]
for event_type in event_types:
await self.channel_layer.group_add(event_type, self.channel_name)
When an event is produced by a kernel plugin, it gets sent to the appropriate channel group:
event_type = "status.flight.started"
await channel_layer.group_send(
event_type,
{
"type": event_type,
"flight_id": flight.id,
},
)
Note the event_type supplied two times. The first one is the group name, so
that the event can be properly routed to all subscribers. The second one is the
event_type being sent within the actual message, so that the consumer can
properly handle it.
Plugin Daemons¶
Plugin daemons run a Channels Consumer on a separate thread, allowing them to receive events in real-time. They can subscribe to specific event types and process them as they arrive.
An example (and very simplified) implementation:
import asyncio
import threading
from django.core.management.base import BaseCommand
from channels.layers import get_channel_layer
class Command(BaseCommand):
async def run_loop(self):
channel_layer = get_channel_layer()
channel_name = await channel_layer.new_channel()
events_to_subscribe = [
"command.flight.start",
"command.flight.end",
]
for event_type in events_to_subscribe:
await channel_layer.group_add(event_type, channel_name)
while True:
# Wait for a new message using blocking pop (BZPOPMIN)
message = await channel_layer.receive(channel_name)
# Process the message
def handle(self, *args, **options):
loop_thread = threading.Thread(target=asyncio.run, args=(self.run_loop(),))
loop_thread.start()
# Run other daemon logic here
Background Tasks¶
Background tasks can produce events using Channels, but they do not receive events, as they are short-lived and do not need to process events in real-time.
An example of producing an observation event from a background task:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def run_background_task():
channel_layer = get_channel_layer()
event_type = "observation.sunrise"
async_to_sync(channel_layer.group_send)(
event_type,
{
"type": event_type,
"sun_color": "orange",
},
)
Reversion¶
To track object changes automatically, models are registered to the reversion
library for change tracking. Within a revision, every object change will be
automatically tracked, and after the revision is committed, an object update
event will be produced for every changed object.
Revisions require transactions to work properly, so every change in the system should be wrapped in a transaction. However, at the time of writing, Django's ORM does not support transactions in async code, so care must be taken to ensure that all database operations are performed synchronously within a transaction.