| 中文 | English |
| Component Tutorial | 组件开发教程 |
| Multiprocessing Tutorial | 多进程教程 |
| Event Mode Tutorial | 事件模式教程 |
| Logging Tutorial | 日志使用 |
This tutorial introduces the Event Mode in Cellium, which is a communication approach based on the Publish-Subscribe pattern. Unlike Command Mode, Event Mode enables more flexible decoupled communication between components.
“In Cellium, events are a more flexible way to connect—you only need to focus on ‘what happened’, while the event bus handles propagation, routing, and responses.”
The core concept of Event Mode is that Publishers and Subscribers do not need to reference each other directly, but instead communicate through an Event Bus. When a publisher triggers an event, all handlers subscribed to that event will receive the notification and execute.
The advantages of this pattern include:
Event Mode is particularly suitable for the following scenarios:
Cellium provides a global Event Bus, exported through the app.core.bus module. To use Event Mode, you first need to import the relevant APIs:
from app.core.bus import event_bus, on, emit, EventBus
The Event Bus is the core of the entire Event Mode, responsible for managing event registration, triggering, and dispatching. The following are the main functions of the Event Bus:
from app.core.bus import event_bus
# Use the global Event Bus directly
event_bus.on("user.login", user_login_handler)
event_bus.emit("user.login", {"username": "admin"})
Cellium uses strings to identify event types, rather than enums. This design provides greater flexibility and supports dynamic event names and pattern matching:
# Static event names
"user.login"
# Namespaced event names
"user.login"
"user.logout"
"admin.created"
# Pattern-matched event names
"user.*"
"*.login"
"user.#.created"
Events can carry arbitrary data, passed through keyword arguments:
# Pass data when triggering an event
event_bus.emit("user.login", username="admin", role="administrator", login_time="2024-01-01")
# Handler receives data
@on("user.login")
def handle_user_login(**kwargs):
print(f"User {kwargs.get('username')} has logged in")
Static event subscription is the most basic form of event subscription, using the @on() or @event() decorator to mark handler functions.
The @on() decorator is the simplest event subscription method, accepting an event name as a parameter:
from app.core.bus import on
@on("user.login")
def handle_user_login(**kwargs):
"""User login event handler"""
username = kwargs.get("username", "Unknown User")
print(f"User {username} has logged in")
print(f"Login time: {kwargs.get('login_time', 'Unknown')}")
# Can also be written this way (same effect)
@on("user.logout")
def handle_user_logout(**kwargs):
"""User logout event handler"""
username = kwargs.get("username", "Unknown User")
print(f"User {username} has logged out")
The @event() decorator is an alias for the @on() decorator, providing the same functionality but semantically emphasizing that this is an event handler:
from app.core.bus import event
@event("order.created")
def handle_order_created(**kwargs):
"""Order created event handler"""
order_id = kwargs.get("order_id")
amount = kwargs.get("amount")
print(f"New order created: Order ID={order_id}, Amount={amount}")
Handlers decorated with @on() or @event() are automatically registered with the Event Bus. Sometimes you need to register manually, and the Event Bus also provides a register() method:
from app.core.bus import event_bus
def custom_handler(**kwargs):
"""Manually registered event handler"""
print(f"Received event data: {kwargs}")
# Manual registration
event_bus.register("custom.event", custom_handler)
# Or use chained calls
event_bus.register("event.a", handler_a).register("event.b", handler_b)
Event handlers execute in registration order. If you need to control the execution order, you can use the priority parameter:
from app.core.bus import on, EventPriority
@on("user.login", priority=EventPriority.HIGH)
def high_priority_handler(**kwargs):
"""High priority handler, executes first"""
print("1. High priority handler")
@on("user.login", priority=EventPriority.NORMAL)
def normal_priority_handler(**kwargs):
"""Normal priority handler"""
print("2. Normal priority handler")
@on("user.login", priority=EventPriority.LOW)
def low_priority_handler(**kwargs):
"""Low priority handler, executes last"""
print("3. Low priority handler")
Execution Result:
1. High priority handler
2. Normal priority handler
3. Low priority handler
Sometimes you only need to listen to an event once, and the handler should not be called after the event triggers. The @once() or @event_once() decorator can achieve this.
from app.core.bus import once
@once("app.startup")
def handle_app_startup(**kwargs):
"""App startup event, executes only once"""
print("App startup initialization")
# Perform one-time initialization operations
@once("config.loaded")
def handle_config_loaded(**kwargs):
"""Config loaded event, executes only once"""
print(f"Config loaded: {kwargs.get('config_data')}")
@event_once() is an alias for @once(), with identical functionality:
from app.core.bus import event_once
@event_once("message.received")
def handle_first_message(**kwargs):
"""Process only the first message"""
print(f"Received first message: {kwargs.get('message')}")
One-time events are suitable for the following scenarios:
from app.core.bus import once, emit
@once("database.connected")
def initialize_database(**kwargs):
"""Initialize database tables after connection"""
print("Initializing database tables...")
@once("user.first_login")
def show_welcome(**kwargs):
"""Show welcome screen on user's first login"""
print(f"Welcome {kwargs.get('username')} on your first login!")
# Trigger events
emit("database.connected", driver="mysql", host="localhost")
emit("user.first_login", username="new_user")
Execution Result:
Initializing database tables...
Welcome new_user on your first login!
If these events are triggered again, the handlers will not be called.
Pattern matching allows you to use wildcards to subscribe to a category of events, rather than a single specific event. This is very useful when you need to listen to multiple related events.
The @pattern() decorator supports various wildcard patterns:
| Wildcard | Meaning | Example |
|---|---|---|
* |
Matches a single word | user.* matches user.login, user.logout |
# |
Matches multiple words (zero or more) | order.#.created matches order.created, order.item.created |
? |
Matches a single character | user.? matches user.a, user.1 |
from app.core.bus import pattern
@pattern("user.*")
def handle_user_events(**kwargs):
"""Handle all user-related events"""
event_name = kwargs.get("_event_name")
print(f"Received user event: {event_name}")
print(f"Event data: {kwargs}")
@pattern("order.#.created")
def handle_order_created_events(**kwargs):
"""Handle all order creation events"""
event_name = kwargs.get("_event_name")
print(f"Received order creation event: {event_name}")
from app.core.bus import pattern, emit
@pattern("notification.*")
def handle_notifications(**kwargs):
"""Handle all notification events"""
event_name = kwargs.get("_event_name")
notification_type = event_name.split(".")[1]
print(f"Notification type: {notification_type}")
print(f"Message: {kwargs.get('message')}")
# Trigger various notification events
emit("notification.email", message="You have a new email")
emit("notification.sms", message="Your verification code is 123456")
emit("notification.push", message="Someone liked your post")
Execution Result:
Notification type: email
Message: You have a new email
Notification type: sms
Message: Your verification code is 123456
Notification type: push
Message: Someone liked your post
from app.core.bus import pattern, emit
@pattern("system.#.error")
def handle_system_errors(**kwargs):
"""Handle all system error events"""
event_name = kwargs.get("_event_name")
error_code = kwargs.get("error_code")
error_msg = kwargs.get("error_message")
print(f"System error [{event_name}]: Code={error_code}, Message={error_msg}")
# Trigger system error events at different levels
emit("system.disk.error", error_code=1001, error_message="Disk space insufficient")
emit("system.network.error", error_code=2001, error_message="Network connection failed")
emit("system.error", error_code=0001, error_message="Unknown error")
Execution Result:
System error [system.disk.error]: Code=1001, Message=Disk space insufficient
System error [system.network.error]: Code=2001, Message=Network connection failed
System error [system.error]: Code=0001, Message=Unknown error
Wildcard subscription can listen to all events, which is very useful for implementing logging, debugging, or global event handling.
The @wildcard() decorator subscribes to all events, regardless of the event name:
from app.core.bus import wildcard
@wildcard()
def log_all_events(**kwargs):
"""Log all events"""
event_name = kwargs.get("_event_name")
event_data = {k: v for k, v in kwargs.items() if k != "_event_name"}
print(f"[Event Log] {event_name}: {event_data}")
Wildcard handlers can also set priority:
from app.core.bus import wildcard, on, EventPriority
@wildcard(priority=EventPriority.LOW)
def event_logger(**kwargs):
"""Event logging (low priority, executes last)"""
event_name = kwargs.get("_event_name")
print(f"[Log] Event {event_name} processed")
@on("user.login", priority=EventPriority.HIGH)
def user_login_high_priority(**kwargs):
"""User login handler (high priority)"""
print(f"[High Priority] User login: {kwargs.get('username')}")
from app.core.bus import wildcard, on, emit, EventPriority
@wildcard(priority=EventPriority.LOW)
def debug_all_events(**kwargs):
"""Log all events in debug mode"""
event_name = kwargs.get("_event_name")
print(f"[Debug] Event: {event_name}")
@on("data.updated")
def handle_data_updated(**kwargs):
"""Data update event"""
print(f"[Business] Data updated: {kwargs.get('data_id')}")
@on("view.opened")
def handle_view_opened(**kwargs):
"""View opened event"""
print(f"[Business] View opened: {kwargs.get('view_name')}")
# Trigger events
emit("data.updated", data_id="12345")
emit("view.opened", view_name="settings")
Execution Result:
[Business] Data updated: 12345
[Debug] Event: data.updated
[Business] View opened: settings
[Debug] Event: view.opened
Publishing events uses the emit() function or the @emitter() decorator.
from app.core.bus import emit
# Trigger simple events
emit("user.login", username="admin", role="administrator")
# Trigger events with complex data
emit("order.created",
order_id="ORD-2024-001",
items=[
{"product_id": "P001", "quantity": 2, "price": 99.00},
{"product_id": "P002", "quantity": 1, "price": 199.00}
],
total_amount=397.00,
customer_id="C001")
The @emitter() decorator can convert class methods into event publishers. When the method is called, it automatically triggers the corresponding event:
from app.core.bus import emitter
class UserManager:
"""User Manager"""
def __init__(self):
self.users = {}
@emitter("user.login")
def login(self, username: str, password: str) -> bool:
"""User login, triggers event on success"""
if self.authenticate(username, password):
self.users[username] = {"login_time": "2024-01-01"}
return True
return False
@emitter("user.logout")
def logout(self, username: str):
"""User logout, triggers event"""
if username in self.users:
del self.users[username]
def authenticate(self, username: str, password: str) -> bool:
"""Simple authentication logic"""
return password == "123456"
# Usage
user_manager = UserManager()
@on("user.login")
def handle_login(**kwargs):
print(f"User {kwargs.get('username')} logged in successfully")
user_manager.login("admin", "123456") # Automatically triggers "user.login" event
from app.core.bus import emit
# Use namespaces to avoid conflicts
emit("shop.order.created", order_id="ORD-001", amount=100.0)
emit("shop.order.cancelled", order_id="ORD-001", reason="User cancelled")
emit("admin.order.approved", order_id="ORD-001", approver="admin")
You can make conditional checks before publishing events:
from app.core.bus import emit, on
@on("order.status_changed")
def handle_status_change(**kwargs):
print(f"Order status changed: {kwargs.get('order_id')} -> {kwargs.get('new_status')}")
class OrderService:
def update_status(self, order_id: str, new_status: str, old_status: str):
"""Update order status, publish event when conditions are met"""
# Only publish event when status actually changes
if new_status != old_status:
emit("order.status_changed",
order_id=order_id,
old_status=old_status,
new_status=new_status,
timestamp="2024-01-01 12:00:00")
Namespaces are an important mechanism for preventing event name conflicts. Using prefixes (namespaces) can categorize and manage events.
It is recommended to use the following naming conventions:
# User-related events
"user.login"
"user.logout"
"user.register"
"user.profile.updated"
# Order-related events
"order.created"
"order.paid"
"order.shipped"
"order.delivered"
# System-related events
"system.startup"
"system.shutdown"
"system.error"
"system.config_changed"
from app.core.bus import on, emit, event
class NotificationService:
"""Notification Service Component"""
@event("notification.email")
def send_email(self, to: str, subject: str, body: str):
"""Send email notification"""
print(f"Sending email to {to}: {subject}")
return True
@event("notification.sms")
def send_sms(self, phone: str, message: str):
"""Send SMS notification"""
print(f"Sending SMS to {phone}: {message}")
return True
# Subscribe to events in a specific namespace
@on("notification.*")
def handle_notification(**kwargs):
notification_type = kwargs.get("_event_name").split(".")[1]
print(f"Received {notification_type} notification request")
When multiple components handle the same type of event, namespaces can effectively avoid conflicts:
from app.core.bus import on, emit
# Plugin A handles user login
@on("pluginA.user.login")
def plugin_a_handle_login(**kwargs):
print("[PluginA] User login handling")
# Plugin B handles user login
@on("pluginB.user.login")
def plugin_b_handle_login(**kwargs):
print("[PluginB] User login handling")
# Trigger events for both plugins
emit("pluginA.user.login", username="user1")
emit("pluginB.user.login", username="user2")
Execution Result:
[PluginA] User login handling
[PluginB] User login handling
Event handlers can be set with different priorities to ensure important event handlers execute first.
Cellium predefines three priority levels:
| Level | Constant | Description |
|---|---|---|
| High | EventPriority.HIGH |
Highest priority, executes first |
| Normal | EventPriority.NORMAL |
Default priority |
| Low | EventPriority.LOW |
Lowest priority, executes last |
from app.core.bus import on, EventPriority
@on("app.startup", priority=EventPriority.HIGH)
def early_initialization(**kwargs):
"""Early initialization, high priority"""
print("1. Early initialization")
@on("app.startup", priority=EventPriority.NORMAL)
def normal_initialization(**kwargs):
"""Normal initialization"""
print("2. Normal initialization")
@on("app.startup", priority=EventPriority.LOW)
def late_initialization(**kwargs):
"""Late initialization, low priority"""
print("3. Late initialization")
If the predefined priority levels are not enough, you can use numeric values for customization:
from app.core.bus import on, EventPriority
@on("data.process", priority=100)
def first_processor(**kwargs):
"""First handler"""
print("Processing step 1")
@on("data.process", priority=50)
def second_processor(**kwargs):
"""Second handler"""
print("Processing step 2")
@on("data.process", priority=0)
def third_processor(**kwargs):
"""Third handler"""
print("Processing step 3")
Higher values mean higher priority.
from app.core.bus import on, emit, EventPriority
@on("request.received", priority=EventPriority.HIGH)
def request_validator(**kwargs):
"""Request validation, high priority"""
print("[Validation] Checking request parameters")
# Can prevent subsequent processing if validation fails
@on("request.received", priority=EventPriority.NORMAL)
def request_handler(**kwargs):
"""Request handling"""
print("[Handling] Processing request")
@on("request.received", priority=EventPriority.LOW)
def request_logger(**kwargs):
"""Request logging, low priority"""
print("[Logging] Recording request")
emit("request.received", path="/api/users", method="GET")
Execution Result:
[Validation] Checking request parameters
[Handling] Processing request
[Logging] Recording request
The following is a comprehensive example that uses various Event Mode features:
from app.core.bus import (
on, once, pattern, wildcard, emitter,
emit, event_bus, EventPriority
)
# ==================== Event Subscriptions ====================
# Normal event subscription
@on("order.created", priority=EventPriority.HIGH)
def handle_order_created(**kwargs):
"""Order creation handling"""
order_id = kwargs.get("order_id")
amount = kwargs.get("amount")
print(f"[Order] New order created: #{order_id}, Amount: ${amount:.2f}")
@on("order.created", priority=EventPriority.NORMAL)
def handle_order_notify(**kwargs):
"""Send notification after order creation"""
order_id = kwargs.get("order_id")
print(f"[Notification] Order #{order_id} created, sending confirmation email")
# One-time event subscription
@once("app.first_start")
def first_start_handler(**kwargs):
"""App first start handling"""
print(f"[Startup] First start, executing initial configuration...")
# Pattern matching subscription
@pattern("order.#.status")
def handle_order_status_change(**kwargs):
"""Handle order status change events"""
event_name = kwargs.get("_event_name")
order_id = kwargs.get("order_id")
status = event_name.split(".")[-1]
print(f"[Order] Order #{order_id} status changed to: {status}")
# Wildcard subscription
@wildcard(priority=EventPriority.LOW)
def event_logger(**kwargs):
"""Log all events"""
event_name = kwargs.get("_event_name")
print(f"[Log] Event {event_name}")
# ==================== Event Publishing Class ====================
class OrderService:
"""Order Service"""
def __init__(self):
self.orders = {}
self.order_counter = 0
@emitter("order.created")
def create_order(self, amount: float, customer: str):
"""Create order"""
self.order_counter += 1
order_id = f"ORD-{self.order_counter:04d}"
self.orders[order_id] = {
"customer": customer,
"amount": amount,
"status": "created"
}
print(f"[Service] Created order {order_id}")
return order_id
@emitter("order.paid")
def pay_order(self, order_id: str):
"""Pay order"""
if order_id in self.orders:
self.orders[order_id]["status"] = "paid"
print(f"[Service] Order {order_id} paid")
return True
return False
@emitter("order.shipped")
def ship_order(self, order_id: str):
"""Ship order"""
if order_id in self.orders:
self.orders[order_id]["status"] = "shipped"
print(f"[Service] Order {order_id} shipped")
return True
return False
# ==================== Execution Example ====================
if __name__ == "__main__":
# Trigger one-time event
emit("app.first_start")
# Create order service
order_service = OrderService()
# Create order (will trigger order.created event)
order_id = order_service.create_order(299.00, "John Doe")
# Pay order (will trigger order.paid event)
order_service.pay_order(order_id)
# Ship order (will trigger order.shipped event)
order_service.ship_order(order_id)
Execution Result:
[Startup] First start, executing initial configuration...
[Order] New order created: #ORD-0001, Amount: $299.00
[Notification] Order #ORD-0001 created, sending confirmation email
[Service] Created order ORD-0001
[Log] Event app.first_start
[Log] Event order.created
[Service] Order ORD-0001 paid
[Order] Order #ORD-0001 status changed to: paid
[Log] Event order.paid
[Service] Order ORD-0001 shipped
[Order] Order #ORD-0001 status changed to: shipped
[Log] Event order.shipped
Event Mode and Command Mode are the two main communication methods in Cellium, each with its own characteristics and applicable scenarios.
| Feature | Command Mode | Event Mode |
|---|---|---|
| Communication | One-to-one request-response | One-to-many publish-subscribe |
| Coupling | Caller and handler coupled | Publisher and subscriber decoupled |
| Execution Timing | Synchronous call, executes immediately | Asynchronous trigger, delayed execution |
| Return Value | Has return value | Usually no return value |
| Use Case | RPC calls, method execution | Decoupled communication, broadcast notifications |
| Flexibility | Static call relationships | Dynamic subscription relationships |
| Error Handling | Throws exceptions directly | Propagates errors through error events |
Command Mode is suitable for the following scenarios:
# Need to get results immediately
class Calculator(ICell):
@property
def cell_name(self) -> str:
return "calculator"
def execute(self, command: str, *args, **kwargs):
if command == "add":
a = kwargs.get("a", 0)
b = kwargs.get("b", 0)
return a + b # Return result immediately
Event Mode is suitable for the following scenarios:
# Need to trigger multiple responses
from app.core.bus import on, emit
@on("user.registered")
def send_welcome_email(**kwargs):
print(f"Sending welcome email to {kwargs.get('email')}")
@on("user.registered")
def initialize_user_data(**kwargs):
print(f"Initializing user data: {kwargs.get('user_id')}")
@on("user.registered")
def log_registration(**kwargs):
print(f"Logging registration: {kwargs.get('user_id')}")
# One event triggers multiple handlers
emit("user.registered", user_id="U001", email="user@example.com")
In practice, both modes can be used together:
from app.core.bus import on, emitter, emit
from app.core.interface import ICell
class UserService(ICell):
"""User Service, supporting both Command Mode and Event Mode"""
@property
def cell_name(self) -> str:
return "user_service"
def execute(self, command: str, *args, **kwargs):
"""Command Mode: direct method calls"""
method_name = f"_cmd_{command}"
if hasattr(self, method_name):
method = getattr(self, method_name)
return method(*args, **kwargs)
return f"Unknown command: {command}"
def _cmd_get_user(self, user_id: str):
"""Get user info"""
return {"user_id": user_id, "name": "John Doe"}
@emitter("user.created")
def create_user(self, username: str, email: str):
"""Create user, trigger event"""
user_id = f"U{len(username)}"
print(f"Creating user: {username}")
return user_id
@emitter("user.deleted")
def delete_user(self, user_id: str):
"""Delete user, trigger event"""
print(f"Deleting user: {user_id}")
return True
# Command Mode call
service = UserService()
user_info = service.execute("get_user", user_id="U001")
# Event Mode subscription
@on("user.created")
def handle_user_created(**kwargs):
print(f"New user created: {kwargs}")
# Mixed calls
user_id = service.create_user("Jane", "jane@example.com")
emit("user.created", user_id=user_id, username="Jane", email="jane@example.com")
Use clear, meaningful names:
# ✅ Good naming
"user.login"
"order.created"
"notification.email.sent"
# ❌ Avoid
"event1"
"something_happened"
"u.l"
Use descriptive function names for event handlers:
# ✅ Good naming
@on("user.login")
def handle_user_login(**kwargs):
pass
@on("order.created")
def process_new_order(**kwargs):
pass
# ❌ Avoid
@on("user.login")
def handler(**kwargs):
pass
@on("order.created")
def func1(**kwargs):
pass
Add appropriate error handling in event handlers:
from app.core.bus import on
@on("data.process")
def process_data(**kwargs):
try:
data_id = kwargs.get("data_id")
# Data processing logic
result = heavy_processing(data_id)
return result
except Exception as e:
print(f"Data processing failed: {e}")
# Can choose to trigger error event
emit("data.process.error", data_id=data_id, error=str(e))
For high-frequency events, pay attention to performance:
from app.core.bus import on
# Use lightweight handlers for high-frequency events
@on("heartbeat")
def heartbeat_handler(**kwargs):
# Only perform minimal processing
pass
# Avoid time-consuming operations in handlers
@on("user.action")
def user_action_handler(**kwargs):
# ❌ Avoid: time-consuming operations
# time.sleep(1)
# heavy_computation()
# ✅ Recommended: move time-consuming operations to background tasks
# asyncio.create_task(heavy_task())
pass
Properly manage event subscriptions to avoid memory leaks:
from app.core.bus import event_bus, on
# Short-term subscriptions use once
@once("session.started")
def session_handler(**kwargs):
pass
# Manually manage long-term subscriptions
subscription = event_bus.on("cache.invalidated", cache_handler)
# Unsubscribe when no longer needed
subscription.unsubscribe()
# Or use context manager
with event_bus.on("temp.event", temp_handler) as subscription:
# Use subscription
pass
Event Mode is a powerful communication mechanism in Cellium that provides:
Through this tutorial, you should have mastered the core concepts and practical application techniques of Event Mode. It is recommended to compare with the Command Mode in the Component Tutorial to learn the characteristics and applicable scenarios of both communication methods, so you can make the right choices in actual development.